diff --git a/plugin/trino-pinot/pom.xml b/plugin/trino-pinot/pom.xml index b2f61f574cb2..385ba451f821 100755 --- a/plugin/trino-pinot/pom.xml +++ b/plugin/trino-pinot/pom.xml @@ -13,7 +13,7 @@ Trino - Pinot connector - 1.1.0 + 1.2.0 true ${air.test.jvm.additional-arguments.default} @@ -25,7 +25,13 @@ net.openhft posix - 2.26ea0 + 2.26ea1 + + + + org.apache.commons + commons-text + 1.12.0 @@ -152,8 +158,12 @@ ${dep.pinot.version} - javax.servlet - javax.servlet-api + jakarta.servlet + jakarta.servlet-api + + + jakarta.ws.rs + jakarta.ws.rs-api org.apache.logging.log4j @@ -171,9 +181,37 @@ org.apache.logging.log4j log4j-slf4j2-impl + + org.glassfish.hk2 + * + + + org.glassfish.hk2.external + aopalliance-repackaged + + + org.glassfish.jersey.containers + jersey-container-grizzly2-http + + + org.glassfish.jersey.containers + jersey-container-servlet-core + org.glassfish.jersey.core - jersey-server + * + + + org.glassfish.jersey.ext + jersey-entity-filtering + + + org.glassfish.jersey.inject + * + + + org.glassfish.jersey.media + * org.slf4j @@ -215,6 +253,10 @@ org.glassfish.hk2.external aopalliance-repackaged + + org.glassfish.hk2.external + aopalliance-repackaged + org.glassfish.hk2.external jakarta.inject @@ -328,7 +370,7 @@ net.openhft chronicle-core - 2.25ea15 + 2.26ea1 runtime diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java index 6f5080c4b15c..0a9b501649b6 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java @@ -32,6 +32,7 @@ import io.trino.plugin.pinot.client.PinotHostMapper; import io.trino.spi.NodeManager; import io.trino.spi.connector.ConnectorNodePartitioningProvider; +import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.utils.DataSchema; import java.io.IOException; @@ -90,6 +91,8 @@ public void setup(Binder binder) }); jsonBinder(binder).addDeserializerBinding(DataSchema.class).to(DataSchemaDeserializer.class); + jsonBinder(binder).addDeserializerBinding(BrokerResponseNative.class).to(BrokerResponseNativeDeserializer.class); + PinotClient.addJsonBinders(jsonCodecBinder(binder)); binder.bind(NodeManager.class).toInstance(nodeManager); binder.bind(ConnectorNodePartitioningProvider.class).to(PinotNodePartitioningProvider.class).in(Scopes.SINGLETON); @@ -120,6 +123,19 @@ public DataSchema deserialize(JsonParser p, DeserializationContext ctxt) } } + public static class BrokerResponseNativeDeserializer + extends JsonDeserializer + { + @Override + public BrokerResponseNative deserialize(JsonParser p, DeserializationContext ctxt) + throws IOException + { + JsonNode jsonNode = ctxt.readTree(p); + String value = jsonNode.toString(); + return BrokerResponseNative.fromJsonString(value); + } + } + public static class PinotGrpcModule extends AbstractConfigurationAwareModule { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java index ec0b4b82c044..98a1b16d87b3 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java @@ -442,8 +442,8 @@ public TimeBoundary( { if (timeColumn != null && timeValue != null) { // See org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler::attachTimeBoundary - offlineTimePredicate = Optional.of(format("%s <= %s", timeColumn, timeValue)); - onlineTimePredicate = Optional.of(format("%s > %s", timeColumn, timeValue)); + offlineTimePredicate = Optional.of(format("%s <= '%s'", timeColumn, timeValue)); + onlineTimePredicate = Optional.of(format("%s > '%s'", timeColumn, timeValue)); } else { onlineTimePredicate = Optional.empty(); @@ -557,10 +557,10 @@ private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, Pin BrokerResponseNative response = doHttpActionWithHeadersJson(builder, Optional.of(queryRequest), brokerResponseCodec, additionalHeadersBuilder.build()); - if (response.getExceptionsSize() > 0 && response.getProcessingExceptions() != null && !response.getProcessingExceptions().isEmpty()) { + if (response.getExceptionsSize() > 0 && response.getExceptions() != null && !response.getExceptions().isEmpty()) { // Pinot is known to return exceptions with benign errorcodes like 200 // so we treat any exception as an error - String processingExceptionMessage = response.getProcessingExceptions().stream() + String processingExceptionMessage = response.getExceptions().stream() .map(e -> "code: '%s' message: '%s'".formatted(e.getErrorCode(), e.getMessage())) .collect(joining(",")); throw new PinotException( diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java index e4aa11be38fe..9a3483c10e7c 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java @@ -245,7 +245,7 @@ public Iterator queryPinot(String query, String serverHo // GrpcQueryClient does not implement Closeable. The idle timeout is 30 minutes (grpc default). GrpcQueryClient client = clientCache.computeIfAbsent(mappedHostAndPort, hostAndPort -> { GrpcQueryClient queryClient = proxyUri.isPresent() ? grpcQueryClientFactory.create(HostAndPort.fromString(proxyUri.get())) : grpcQueryClientFactory.create(hostAndPort); - closer.register(queryClient::close); + closer.register(queryClient); return queryClient; }); PinotProxyGrpcRequestBuilder grpcRequestBuilder = new PinotProxyGrpcRequestBuilder() diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotExpressionRewriter.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotExpressionRewriter.java index a8e100233785..bfd53aebfe18 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotExpressionRewriter.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotExpressionRewriter.java @@ -64,7 +64,7 @@ import static org.apache.pinot.common.request.context.ExpressionContext.Type.LITERAL; import static org.apache.pinot.common.request.context.ExpressionContext.forFunction; import static org.apache.pinot.common.request.context.ExpressionContext.forIdentifier; -import static org.apache.pinot.common.request.context.ExpressionContext.forLiteralContext; +import static org.apache.pinot.common.request.context.ExpressionContext.forLiteral; import static org.apache.pinot.core.operator.transform.function.DateTruncTransformFunction.EXAMPLE_INVOCATION; import static org.apache.pinot.core.operator.transform.transformer.timeunit.TimeUnitTransformerFactory.getTimeUnitTransformer; import static org.apache.pinot.segment.spi.AggregationFunctionType.COUNT; @@ -175,14 +175,14 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex ImmutableList.Builder argumentsBuilder = ImmutableList.builder(); argumentsBuilder.add(rewriteExpression(object.getArguments().get(0), context)); String inputFormat = object.getArguments().get(1).getLiteral().getValue().toString().toUpperCase(ENGLISH); - argumentsBuilder.add(forLiteralContext(stringValue(inputFormat))); + argumentsBuilder.add(forLiteral(stringValue(inputFormat))); String outputFormat = object.getArguments().get(2).getLiteral().getValue().toString().toUpperCase(ENGLISH); - argumentsBuilder.add(forLiteralContext(stringValue(outputFormat))); + argumentsBuilder.add(forLiteral(stringValue(outputFormat))); String granularity = object.getArguments().get(3).getLiteral().getValue().toString().toUpperCase(ENGLISH); BaseDateTimeTransformer dateTimeTransformer = DateTimeTransformerFactory.getDateTimeTransformer(inputFormat, outputFormat, granularity); // Even if the format is valid, make sure it is not a simple date format: format characters can be ambiguous due to lower casing checkState(dateTimeTransformer instanceof EpochToEpochTransformer, "Unsupported date format: simple date format not supported"); - argumentsBuilder.add(forLiteralContext(stringValue(granularity))); + argumentsBuilder.add(forLiteral(stringValue(granularity))); return new FunctionContext(object.getType(), object.getFunctionName(), argumentsBuilder.build()); } } @@ -212,8 +212,8 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex String outputTimeUnitArgument = object.getArguments().get(2).getLiteral().getValue().toString().toUpperCase(ENGLISH); // Check that this is a valid time unit transform getTimeUnitTransformer(inputTimeUnit, outputTimeUnitArgument); - argumentsBuilder.add(forLiteralContext(stringValue(inputTimeUnitArgument))); - argumentsBuilder.add(forLiteralContext(stringValue(outputTimeUnitArgument))); + argumentsBuilder.add(forLiteral(stringValue(inputTimeUnitArgument))); + argumentsBuilder.add(forLiteral(stringValue(outputTimeUnitArgument))); return new FunctionContext(object.getType(), object.getFunctionName(), argumentsBuilder.build()); } } @@ -239,7 +239,7 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex checkState(arguments.get(0).getType() == LITERAL, "First argument must be a literal"); String unit = arguments.get(0).getLiteral().getValue().toString().toLowerCase(ENGLISH); - argumentsBuilder.add(forLiteralContext(stringValue(unit))); + argumentsBuilder.add(forLiteral(stringValue(unit))); verifyIsIdentifierOrFunction(object.getArguments().get(1)); ExpressionContext valueArgument = rewriteExpression(arguments.get(1), context); argumentsBuilder.add(valueArgument); @@ -248,7 +248,7 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex String inputTimeUnitArgument = arguments.get(2).getLiteral().getValue().toString().toUpperCase(ENGLISH); // Ensure this is a valid TimeUnit TimeUnit inputTimeUnit = TimeUnit.valueOf(inputTimeUnitArgument); - argumentsBuilder.add(forLiteralContext(stringValue(inputTimeUnit.name()))); + argumentsBuilder.add(forLiteral(stringValue(inputTimeUnit.name()))); if (arguments.size() >= 4) { checkState(arguments.get(3).getType() == LITERAL, "Unexpected 4th argument '%s'", arguments.get(3)); // Time zone is lower cased inside Pinot @@ -258,7 +258,7 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex String outputTimeUnitArgument = arguments.get(4).getLiteral().getValue().toString().toUpperCase(ENGLISH); // Ensure this is a valid TimeUnit TimeUnit outputTimeUnit = TimeUnit.valueOf(outputTimeUnitArgument); - argumentsBuilder.add(forLiteralContext(stringValue(outputTimeUnit.name()))); + argumentsBuilder.add(forLiteral(stringValue(outputTimeUnit.name()))); } } } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/MetadataUtil.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/MetadataUtil.java index 5c525f853167..ea1d4d37aa90 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/MetadataUtil.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/MetadataUtil.java @@ -77,6 +77,7 @@ protected Type _deserialize(String value, DeserializationContext context) objectMapperProvider.setJsonDeserializers(ImmutableMap., JsonDeserializer>builder() .put(Type.class, new TestingTypeDeserializer()) .put(DataSchema.class, new PinotModule.DataSchemaDeserializer()) + .put(BrokerResponseNative.class, new PinotModule.BrokerResponseNativeDeserializer()) .buildOrThrow()); JsonCodecFactory codecFactory = new JsonCodecFactory(objectMapperProvider); COLUMN_CODEC = codecFactory.jsonCodec(PinotColumnHandle.class); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConnectorSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConnectorSmokeTest.java index 530b516ba3b4..a6eb09e5465a 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConnectorSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConnectorSmokeTest.java @@ -53,6 +53,7 @@ import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.junit.jupiter.api.Test; @@ -646,7 +647,7 @@ record = null; return record; }; SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); - driver.init(segmentGeneratorConfig, dataSource, new TransformPipeline(recordTransformer, null)); + driver.init(segmentGeneratorConfig, dataSource, new RecordEnricherPipeline(), new TransformPipeline(recordTransformer, null)); driver.build(); File segmentOutputDirectory = driver.getOutputDirectory(); File tgzPath = new File(String.join(File.separator, outputDirectory, segmentOutputDirectory.getName() + ".tar.gz")); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java index fba5ca516632..778d7b3e7405 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java @@ -66,7 +66,7 @@ public class TestingPinotCluster implements Closeable { - public static final String PINOT_LATEST_IMAGE_NAME = "apachepinot/pinot:1.1.0"; + public static final String PINOT_LATEST_IMAGE_NAME = "apachepinot/pinot:1.2.0"; private static final String ZOOKEEPER_INTERNAL_HOST = "zookeeper"; private static final JsonCodec> LIST_JSON_CODEC = listJsonCodec(String.class); private static final JsonCodec PINOT_SUCCESS_RESPONSE_JSON_CODEC = jsonCodec(PinotSuccessResponse.class);