Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Requirements

To connect to Pinot, you need:

* Pinot 0.10.0 or higher.
* Pinot 0.11.0 or higher.
* Network access from the Trino coordinator and workers to the Pinot controller
nodes. Port 8098 is the default port.

Expand Down
2 changes: 1 addition & 1 deletion plugin/trino-pinot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.pinot.version>0.11.0</dep.pinot.version>
<dep.pinot.version>0.12.1</dep.pinot.version>
<!--
Project's default for air.test.parallel is 'methods'. By design, 'instances' makes TestNG run tests from one class in a single thread.
As a side effect, it prevents TestNG from initializing multiple test instances upfront, which happens with 'methods'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.trino.plugin.pinot.PinotException;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.datatable.DataTable;

import java.util.List;
import java.util.Map;
Expand All @@ -27,7 +27,7 @@
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.pinot.common.utils.DataTable.EXCEPTION_METADATA_KEY;
import static org.apache.pinot.common.datatable.DataTable.EXCEPTION_METADATA_KEY;

public interface PinotDataFetcher
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package io.trino.plugin.pinot.client;

import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.datatable.DataTable;

public class PinotDataTableWithSize
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import io.trino.plugin.pinot.query.PinotProxyGrpcRequestBuilder;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.spi.utils.CommonConstants.Query.Response.MetadataKeys;
import org.apache.pinot.spi.utils.CommonConstants.Query.Response.ResponseType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
import io.trino.plugin.pinot.PinotSessionProperties;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.transport.AsyncQueryResponse;
import org.apache.pinot.core.transport.QueryRouter;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
Expand Down Expand Up @@ -189,7 +191,7 @@ public PinotLegacyServerQueryClient(PinotHostMapper pinotHostMapper, PinotConfig
PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
this.brokerMetrics = new BrokerMetrics(registry);
brokerMetrics.initializeGlobalMeters();
queryRouter = new QueryRouter(trinoHostId, brokerMetrics);
queryRouter = new QueryRouter(trinoHostId, brokerMetrics, new ServerRoutingStatsManager(new PinotConfiguration()));
}

private static String getDefaultTrinoId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,20 @@
import static io.trino.plugin.pinot.query.PinotPatterns.transformFunction;
import static io.trino.plugin.pinot.query.PinotPatterns.transformFunctionType;
import static io.trino.plugin.pinot.query.PinotSqlFormatter.getColumnHandle;
import static io.trino.plugin.pinot.query.PinotTransformFunctionTypeResolver.getTransformFunctionType;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.apache.pinot.common.function.TransformFunctionType.DATETIMECONVERT;
import static org.apache.pinot.common.function.TransformFunctionType.DATETRUNC;
import static org.apache.pinot.common.function.TransformFunctionType.TIMECONVERT;
import static org.apache.pinot.common.request.Literal.stringValue;
import static org.apache.pinot.common.request.context.ExpressionContext.Type.FUNCTION;
import static org.apache.pinot.common.request.context.ExpressionContext.Type.IDENTIFIER;
import static org.apache.pinot.common.request.context.ExpressionContext.Type.LITERAL;
import static org.apache.pinot.common.request.context.ExpressionContext.forFunction;
import static org.apache.pinot.common.request.context.ExpressionContext.forIdentifier;
import static org.apache.pinot.common.request.context.ExpressionContext.forLiteral;
import static org.apache.pinot.common.request.context.ExpressionContext.forLiteralContext;
import static org.apache.pinot.core.operator.transform.function.DateTruncTransformFunction.EXAMPLE_INVOCATION;
import static org.apache.pinot.core.operator.transform.transformer.timeunit.TimeUnitTransformerFactory.getTimeUnitTransformer;
import static org.apache.pinot.segment.spi.AggregationFunctionType.COUNT;
Expand Down Expand Up @@ -124,7 +126,7 @@ private static FunctionContext rewriteFunction(FunctionContext functionContext,
{
Optional<FunctionContext> result = Optional.empty();
if (functionContext.getType() == FunctionContext.Type.TRANSFORM) {
RewriteRule<FunctionContext> rule = FUNCTION_RULE_MAP.get(TransformFunctionType.getTransformFunctionType(functionContext.getFunctionName()));
RewriteRule<FunctionContext> rule = FUNCTION_RULE_MAP.get(getTransformFunctionType(functionContext).orElseThrow());
if (rule != null) {
result = applyRule(rule, functionContext, context);
}
Expand Down Expand Up @@ -176,15 +178,15 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex

ImmutableList.Builder<ExpressionContext> argumentsBuilder = ImmutableList.builder();
argumentsBuilder.add(rewriteExpression(object.getArguments().get(0), context));
String inputFormat = object.getArguments().get(1).getLiteral().toUpperCase(ENGLISH);
argumentsBuilder.add(forLiteral(inputFormat));
String outputFormat = object.getArguments().get(2).getLiteral().toUpperCase(ENGLISH);
argumentsBuilder.add(forLiteral(outputFormat));
String granularity = object.getArguments().get(3).getLiteral().toUpperCase(ENGLISH);
String inputFormat = object.getArguments().get(1).getLiteral().getValue().toString().toUpperCase(ENGLISH);
argumentsBuilder.add(forLiteralContext(stringValue(inputFormat)));
String outputFormat = object.getArguments().get(2).getLiteral().getValue().toString().toUpperCase(ENGLISH);
argumentsBuilder.add(forLiteralContext(stringValue(outputFormat)));
String granularity = object.getArguments().get(3).getLiteral().getValue().toString().toUpperCase(ENGLISH);
BaseDateTimeTransformer dateTimeTransformer = DateTimeTransformerFactory.getDateTimeTransformer(inputFormat, outputFormat, granularity);
// Even if the format is valid, make sure it is not a simple date format: format characters can be ambiguous due to lower casing
checkState(dateTimeTransformer instanceof EpochToEpochTransformer, "Unsupported date format: simple date format not supported");
argumentsBuilder.add(forLiteral(granularity));
argumentsBuilder.add(forLiteralContext(stringValue(granularity)));
return new FunctionContext(object.getType(), object.getFunctionName(), argumentsBuilder.build());
}
}
Expand All @@ -209,13 +211,13 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex

ImmutableList.Builder<ExpressionContext> argumentsBuilder = ImmutableList.builder();
argumentsBuilder.add(rewriteExpression(object.getArguments().get(0), context));
String inputTimeUnitArgument = object.getArguments().get(1).getLiteral().toUpperCase(ENGLISH);
String inputTimeUnitArgument = object.getArguments().get(1).getLiteral().getValue().toString().toUpperCase(ENGLISH);
TimeUnit inputTimeUnit = TimeUnit.valueOf(inputTimeUnitArgument);
String outputTimeUnitArgument = object.getArguments().get(2).getLiteral().toUpperCase(ENGLISH);
String outputTimeUnitArgument = object.getArguments().get(2).getLiteral().getValue().toString().toUpperCase(ENGLISH);
// Check that this is a valid time unit transform
getTimeUnitTransformer(inputTimeUnit, outputTimeUnitArgument);
argumentsBuilder.add(forLiteral(inputTimeUnitArgument));
argumentsBuilder.add(forLiteral(outputTimeUnitArgument));
argumentsBuilder.add(forLiteralContext(stringValue(inputTimeUnitArgument)));
argumentsBuilder.add(forLiteralContext(stringValue(outputTimeUnitArgument)));
return new FunctionContext(object.getType(), object.getFunctionName(), argumentsBuilder.build());
}
}
Expand All @@ -240,27 +242,27 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex
ImmutableList.Builder<ExpressionContext> argumentsBuilder = ImmutableList.builder();

checkState(arguments.get(0).getType() == LITERAL, "First argument must be a literal");
String unit = arguments.get(0).getLiteral().toLowerCase(ENGLISH);
argumentsBuilder.add(forLiteral(unit));
String unit = arguments.get(0).getLiteral().getValue().toString().toLowerCase(ENGLISH);
argumentsBuilder.add(forLiteralContext(stringValue(unit)));
verifyIsIdentifierOrFunction(object.getArguments().get(1));
ExpressionContext valueArgument = rewriteExpression(arguments.get(1), context);
argumentsBuilder.add(valueArgument);
if (arguments.size() >= 3) {
checkState(arguments.get(2).getType() == LITERAL, "Unexpected 3rd argument: '%s'", arguments.get(2));
String inputTimeUnitArgument = arguments.get(2).getLiteral().toUpperCase(ENGLISH);
String inputTimeUnitArgument = arguments.get(2).getLiteral().getValue().toString().toUpperCase(ENGLISH);
// Ensure this is a valid TimeUnit
TimeUnit inputTimeUnit = TimeUnit.valueOf(inputTimeUnitArgument);
argumentsBuilder.add(forLiteral(inputTimeUnit.name()));
argumentsBuilder.add(forLiteralContext(stringValue(inputTimeUnit.name())));
if (arguments.size() >= 4) {
checkState(arguments.get(3).getType() == LITERAL, "Unexpected 4th argument '%s'", arguments.get(3));
// Time zone is lower cased inside Pinot
argumentsBuilder.add(arguments.get(3));
if (arguments.size() >= 5) {
checkState(arguments.get(4).getType() == LITERAL, "Unexpected 5th argument: '%s'", arguments.get(4));
String outputTimeUnitArgument = arguments.get(4).getLiteral().toUpperCase(ENGLISH);
String outputTimeUnitArgument = arguments.get(4).getLiteral().getValue().toString().toUpperCase(ENGLISH);
// Ensure this is a valid TimeUnit
TimeUnit outputTimeUnit = TimeUnit.valueOf(outputTimeUnitArgument);
argumentsBuilder.add(forLiteral(outputTimeUnit.name()));
argumentsBuilder.add(forLiteralContext(stringValue(outputTimeUnit.name())));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.Optional;

import static io.trino.matching.Pattern.typeOf;
import static org.apache.pinot.common.function.TransformFunctionType.getTransformFunctionType;
import static io.trino.plugin.pinot.query.PinotTransformFunctionTypeResolver.getTransformFunctionType;
import static org.apache.pinot.common.request.context.ExpressionContext.Type.FUNCTION;
import static org.apache.pinot.common.request.context.ExpressionContext.Type.IDENTIFIER;
import static org.apache.pinot.common.request.context.FunctionContext.Type.AGGREGATION;
Expand Down Expand Up @@ -235,7 +235,7 @@ public static Pattern<FunctionContext> binaryFunction()
{
return Property.optionalProperty("transformFunctionType", functionContext -> {
if (functionContext.getType() == TRANSFORM) {
return Optional.of(getTransformFunctionType(functionContext.getFunctionName()));
return getTransformFunctionType(functionContext);
}
return Optional.empty();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static io.trino.plugin.pinot.query.PinotPatterns.transformFunction;
import static io.trino.plugin.pinot.query.PinotPatterns.transformFunctionName;
import static io.trino.plugin.pinot.query.PinotPatterns.transformFunctionType;
import static io.trino.plugin.pinot.query.PinotTransformFunctionTypeResolver.getTransformFunctionType;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
Expand Down Expand Up @@ -198,7 +199,7 @@ private static String formatExpression(ExpressionContext expressionContext, Cont
{
switch (expressionContext.getType()) {
case LITERAL:
return singleQuoteValue(expressionContext.getLiteral());
return singleQuoteValue(expressionContext.getLiteral().getValue().toString());
case IDENTIFIER:
if (context.getColumnHandles().isPresent()) {
return quoteIdentifier(getColumnHandle(expressionContext.getIdentifier(), context.getSchemaTableName(), context.getColumnHandles().get()).getColumnName());
Expand All @@ -214,7 +215,7 @@ private static String formatFunction(FunctionContext functionContext, Context co
{
Optional<String> result = Optional.empty();
if (functionContext.getType() == FunctionContext.Type.TRANSFORM) {
Rule<FunctionContext> rule = FUNCTION_RULE_MAP.get(TransformFunctionType.getTransformFunctionType(functionContext.getFunctionName()));
Rule<FunctionContext> rule = FUNCTION_RULE_MAP.get(getTransformFunctionType(functionContext).orElseThrow());

if (rule != null) {
result = applyRule(rule, functionContext, context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.query;

import org.apache.pinot.common.function.FunctionRegistry;
import org.apache.pinot.common.function.TransformFunctionType;
import org.apache.pinot.common.request.context.FunctionContext;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import static org.apache.pinot.common.function.TransformFunctionType.SCALAR;
import static org.apache.pinot.core.operator.transform.function.TransformFunctionFactory.canonicalize;

public final class PinotTransformFunctionTypeResolver
{
private PinotTransformFunctionTypeResolver() {}

private static final Map<String, TransformFunctionType> TRANSFORM_FUNCTION_TYPE_MAP;

static
{
Map<String, TransformFunctionType> builder = new HashMap<>();
for (TransformFunctionType transformFunctionType : TransformFunctionType.values()) {
for (String alias : transformFunctionType.getAliases()) {
TransformFunctionType previousValue = builder.put(canonicalize(alias), transformFunctionType);
checkState(previousValue == null || previousValue == transformFunctionType, "Duplicate key with different values for alias '%s', transform function type '%s' and previous value '%s'", canonicalize(alias), transformFunctionType, previousValue);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to use == for comparing TransformFunctionType? Does it not implement equality and we need to use identity?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an enum, is that ok?

}
}
TRANSFORM_FUNCTION_TYPE_MAP = Map.copyOf(builder);
}

// Extracted from org.apache.pinot.core.operator.transform.function.TransformFunctionFactory::get
public static Optional<TransformFunctionType> getTransformFunctionType(FunctionContext function)
{
requireNonNull(function, "function is null");
String canonicalizedFunctionName = canonicalize(function.getFunctionName());
TransformFunctionType transformFunctionType = TRANSFORM_FUNCTION_TYPE_MAP.get(canonicalizedFunctionName);
if (transformFunctionType != null) {
return Optional.of(transformFunctionType);
}
if (FunctionRegistry.containsFunction(canonicalizedFunctionName)) {
return Optional.of(SCALAR);
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static io.trino.plugin.pinot.PinotQueryRunner.createPinotQueryRunner;
import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_LATEST_IMAGE_NAME;
import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_PREVIOUS_IMAGE_NAME;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.RealType.REAL;
Expand Down Expand Up @@ -142,11 +141,6 @@ protected String getPinotImageName()
return PINOT_PREVIOUS_IMAGE_NAME;
}

protected boolean isLatestVersion()
{
return getPinotImageName().equals(PINOT_LATEST_IMAGE_NAME);
}

@Override
protected QueryRunner createQueryRunner()
throws Exception
Expand Down Expand Up @@ -308,13 +302,6 @@ private void createAndPopulateTooManyRowsTable(TestingKafka kafka, TestingPinotC
.set("updatedAt", initialUpdatedAt.plusMillis(i * 1000).toEpochMilli())
.build()));
}
// For pinot 0.11.0+: rows with null time column values are ingested
// Only add a null row with a null time column for pinot < 0.11.0
if (!isLatestVersion()) {
Comment thread
ebyhr marked this conversation as resolved.
Outdated
// Add a null row, verify it was not ingested as pinot does not accept null time column values.
// The data is verified in testBrokerQueryWithTooManyRowsForSegmentQuery
tooManyRowsRecordsBuilder.add(new ProducerRecord<>(TOO_MANY_ROWS_TABLE, "key" + MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES, new GenericRecordBuilder(tooManyRowsAvroSchema).build()));
}
kafka.sendMessages(tooManyRowsRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka));
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("too_many_rows_schema.json"), TOO_MANY_ROWS_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("too_many_rows_realtimeSpec.json"), TOO_MANY_ROWS_TABLE);
Expand Down
Loading