Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Requirements

To connect to Pinot, you need:

* Pinot 0.9.3 or higher.
* Pinot 0.10.0 or higher.
* Network access from the Trino coordinator and workers to the Pinot controller
nodes. Port 8098 is the default port.

Expand Down
96 changes: 90 additions & 6 deletions plugin/trino-pinot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.pinot.version>0.10.0</dep.pinot.version>
<dep.pinot.version>0.11.0</dep.pinot.version>
<!--
Project's default for air.test.parallel is 'methods'. By design, 'instances' makes TestNG run tests from one class in a single thread.
As a side effect, it prevents TestNG from initializing multiple test instances upfront, which happens with 'methods'.
Expand All @@ -39,12 +39,17 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.17.2</version>
<version>3.19.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.9</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-common</artifactId>
<version>2.30</version>
<version>2.35</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
Expand Down Expand Up @@ -166,7 +171,7 @@
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
<version>0.9.8</version>
<version>1.0.4</version>
<exclusions>
<exclusion>
<groupId>commons-io</groupId>
Expand All @@ -184,6 +189,15 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<!-- Conflicts with log4j-to-slf4j from trino-kafka -->
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down Expand Up @@ -308,6 +322,10 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down Expand Up @@ -464,14 +482,14 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8</version>
<version>3.11</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.11</version>
<version>4.4.13</version>
<scope>runtime</scope>
</dependency>

Expand Down Expand Up @@ -568,6 +586,12 @@
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -635,6 +659,66 @@
</rules>
</configuration>
</plugin>
<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
<configuration>
<ignoredDependencies>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What library is contributing the duplicates? Can we improve this by shading upstream?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw it one due to the recent helix-core upgrade to 1.0.4. It brings several helix-* and they become duplicate each other, which I don't know the reason

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw the warning of duplicate same version. So I feel it's safe to ignore

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-epoll</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
</dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>metrics-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>metadata-store-directory-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>zookeeper-api</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl:2.17.1</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</dependency>
</ignoredDependencies>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.plugin.pinot.PinotException;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
Expand All @@ -43,9 +44,9 @@
import java.util.concurrent.ConcurrentHashMap;

import static java.util.Objects.requireNonNull;
import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE;
import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.CONFIG_USE_PLAIN_TEXT;
import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.GRPC_TLS_PREFIX;
import static org.apache.pinot.common.config.GrpcConfig.CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE;
import static org.apache.pinot.common.config.GrpcConfig.CONFIG_USE_PLAIN_TEXT;
import static org.apache.pinot.common.config.GrpcConfig.GRPC_TLS_PREFIX;

public class PinotGrpcDataFetcher
implements PinotDataFetcher
Expand Down Expand Up @@ -154,13 +155,13 @@ public interface GrpcQueryClientFactory
public static class PlainTextGrpcQueryClientFactory
implements GrpcQueryClientFactory
{
private final GrpcQueryClient.Config config;
private final GrpcConfig config;

@Inject
public PlainTextGrpcQueryClientFactory(PinotGrpcServerQueryClientConfig grpcClientConfig)
{
requireNonNull(grpcClientConfig, "grpcClientConfig is null");
this.config = new GrpcQueryClient.Config(ImmutableMap.<String, Object>builder()
this.config = new GrpcConfig(ImmutableMap.<String, Object>builder()
.put(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, String.valueOf(grpcClientConfig.getMaxInboundMessageSize().toBytes()))
.put(CONFIG_USE_PLAIN_TEXT, String.valueOf(grpcClientConfig.isUsePlainText()))
.buildOrThrow());
Expand All @@ -185,7 +186,7 @@ public static class TlsGrpcQueryClientFactory
private static final String TRUSTSTORE_PASSWORD = GRPC_TLS_PREFIX + "." + "truststore.password";
private static final String SSL_PROVIDER = GRPC_TLS_PREFIX + "." + "ssl.provider";

private final GrpcQueryClient.Config config;
private final GrpcConfig config;

@Inject
public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientTlsConfig tlsConfig)
Expand All @@ -203,7 +204,7 @@ public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientTlsConfig tlsConfig)
}
tlsConfigBuilder.put(SSL_PROVIDER, tlsConfig.getSslProvider());

this.config = new GrpcQueryClient.Config(tlsConfigBuilder.buildOrThrow());
this.config = new GrpcConfig(tlsConfigBuilder.buildOrThrow());
}

@Override
Expand All @@ -215,8 +216,6 @@ public GrpcQueryClient create(HostAndPort hostAndPort)

public static class PinotGrpcServerQueryClient
Comment thread
elonazoulay marked this conversation as resolved.
Outdated
{
private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler();

private final PinotHostMapper pinotHostMapper;
private final Map<HostAndPort, GrpcQueryClient> clientCache = new ConcurrentHashMap<>();
private final int grpcPort;
Expand All @@ -241,7 +240,7 @@ public Iterator<PinotDataTableWithSize> queryPinot(ConnectorSession session, Str
closer.register(queryClient::close);
return queryClient;
});
BrokerRequest brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(query);
BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
GrpcRequestBuilder requestBuilder = new GrpcRequestBuilder()
.setSql(query)
.setSegments(segments)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.airlift.configuration.Config;
import io.airlift.units.DataSize;

import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE;
import static org.apache.pinot.common.config.GrpcConfig.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE;

public class PinotGrpcServerQueryClientConfig
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.trino.plugin.pinot.PinotSplit;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
Expand All @@ -31,6 +30,7 @@
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
Expand Down Expand Up @@ -172,7 +172,6 @@ public int getRowLimit()

public static class PinotLegacyServerQueryClient
{
private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler();
private static final String TRINO_HOST_PREFIX = "trino-pinot-master";

private final String trinoHostId;
Expand Down Expand Up @@ -212,7 +211,7 @@ public Iterator<PinotDataTableWithSize> queryPinot(ConnectorSession session, Str
// TODO: separate into offline and realtime methods
BrokerRequest brokerRequest;
try {
brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(query);
brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
}
catch (SqlCompilationException e) {
throw new PinotException(PINOT_INVALID_PQL_GENERATED, Optional.of(query), format("Parsing error with on %s, Error = %s", serverHost, e.getMessage()), e);
Expand All @@ -229,7 +228,7 @@ public Iterator<PinotDataTableWithSize> queryPinot(ConnectorSession session, Str
AsyncQueryResponse asyncQueryResponse =
doWithRetries(pinotRetryCount, requestId -> queryRouter.submitQuery(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, connectionTimeoutInMillis));
try {
Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses();
ImmutableList.Builder<PinotDataTableWithSize> pinotDataTableWithSizeBuilder = ImmutableList.builder();
for (Map.Entry<ServerRoutingInstance, ServerResponse> entry : response.entrySet()) {
ServerResponse serverResponse = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.reduce.PostAggregationHandler;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;

Expand Down Expand Up @@ -62,7 +62,6 @@

public final class DynamicTableBuilder
{
private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler();
public static final String OFFLINE_SUFFIX = "_OFFLINE";
public static final String REALTIME_SUFFIX = "_REALTIME";
private static final Set<AggregationFunctionType> NON_NULL_ON_EMPTY_AGGREGATIONS = EnumSet.of(COUNT, DISTINCTCOUNT, DISTINCTCOUNTHLL);
Expand All @@ -77,9 +76,10 @@ public static DynamicTable buildFromPql(PinotMetadata pinotMetadata, SchemaTable
requireNonNull(schemaTableName, "schemaTableName is null");
requireNonNull(typeConverter, "typeConverter is null");
String query = schemaTableName.getTableName();
BrokerRequest request = REQUEST_COMPILER.compileToBrokerRequest(query);
BrokerRequest request = CalciteSqlCompiler.compileToBrokerRequest(query);
PinotQuery pinotQuery = request.getPinotQuery();
QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(request);
QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery);

String tableName = request.getQuerySource().getTableName();
String trinoTableName = stripSuffix(tableName).toLowerCase(ENGLISH);
String pinotTableName = pinotClient.getPinotTableNameFromTrinoTableName(trinoTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.apache.pinot.common.function.TransformFunctionType;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.core.operator.transform.transformer.datetime.BaseDateTimeTransformer;
import org.apache.pinot.core.operator.transform.transformer.datetime.DateTimeTransformerFactory;
import org.apache.pinot.core.operator.transform.transformer.datetime.EpochToEpochTransformer;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.DateTimeFormatSpec;

import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -65,8 +67,6 @@
import static org.apache.pinot.core.operator.transform.transformer.timeunit.TimeUnitTransformerFactory.getTimeUnitTransformer;
import static org.apache.pinot.segment.spi.AggregationFunctionType.COUNT;
import static org.apache.pinot.segment.spi.AggregationFunctionType.getAggregationFunctionType;
import static org.apache.pinot.spi.data.DateTimeFormatSpec.validateFormat;
import static org.apache.pinot.spi.data.DateTimeGranularitySpec.validateGranularity;

public class PinotExpressionRewriter
{
Expand Down Expand Up @@ -177,13 +177,13 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex
ImmutableList.Builder<ExpressionContext> argumentsBuilder = ImmutableList.builder();
argumentsBuilder.add(rewriteExpression(object.getArguments().get(0), context));
String inputFormat = object.getArguments().get(1).getLiteral().toUpperCase(ENGLISH);
checkDateTimeFormatSpec(inputFormat);
argumentsBuilder.add(forLiteral(inputFormat));
String outputFormat = object.getArguments().get(2).getLiteral().toUpperCase(ENGLISH);
checkDateTimeFormatSpec(outputFormat);
argumentsBuilder.add(forLiteral(outputFormat));
String granularity = object.getArguments().get(3).getLiteral().toUpperCase(ENGLISH);
validateGranularity(granularity);
BaseDateTimeTransformer dateTimeTransformer = DateTimeTransformerFactory.getDateTimeTransformer(inputFormat, outputFormat, granularity);
// Even if the format is valid, make sure it is not a simple date format: format characters can be ambiguous due to lower casing
checkState(dateTimeTransformer instanceof EpochToEpochTransformer, "Unsupported date format: simple date format not supported");
argumentsBuilder.add(forLiteral(granularity));
return new FunctionContext(object.getType(), object.getFunctionName(), argumentsBuilder.build());
}
Expand Down Expand Up @@ -306,15 +306,6 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex
}
}

private static void checkDateTimeFormatSpec(String dateTimeFormat)
{
requireNonNull(dateTimeFormat, "dateTimeFormat is null");
validateFormat(dateTimeFormat);
// Even if the format is valid, make sure it is not a simple date format: format characters can be ambiguous due to lower casing
DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(dateTimeFormat);
checkState(dateTimeFormatSpec.getSDFPattern() == null, "Unsupported date format: simple date format not supported");
}

private static void verifyIsIdentifierOrFunction(ExpressionContext expressionContext)
{
verify(expressionContext.getType() == IDENTIFIER || expressionContext.getType() == FUNCTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,16 @@ public static Pattern<FunctionContext> binaryFunction()
});
}

public static Property<FunctionContext, ?, String> transformFunctionName()
{
return Property.optionalProperty("transformFunctionType", functionContext -> {
if (functionContext.getType() == TRANSFORM) {
return Optional.of(functionContext.getFunctionName());
}
return Optional.empty();
});
}

// AggregationFunction Properties
public static Property<FunctionContext, ?, AggregationFunctionType> aggregationFunctionType()
{
Expand Down
Loading