diff --git a/docs/src/main/sphinx/connector/pinot.rst b/docs/src/main/sphinx/connector/pinot.rst index 0c31882a266a..b9d0f76e3416 100644 --- a/docs/src/main/sphinx/connector/pinot.rst +++ b/docs/src/main/sphinx/connector/pinot.rst @@ -14,7 +14,7 @@ Requirements To connect to Pinot, you need: -* Pinot 0.9.3 or higher. +* Pinot 0.10.0 or higher. * Network access from the Trino coordinator and workers to the Pinot controller nodes. Port 8098 is the default port. diff --git a/plugin/trino-pinot/pom.xml b/plugin/trino-pinot/pom.xml index 7889ee7af081..cebe816a570d 100755 --- a/plugin/trino-pinot/pom.xml +++ b/plugin/trino-pinot/pom.xml @@ -14,7 +14,7 @@ ${project.parent.basedir} - 0.10.0 + 0.11.0 + + org.apache.logging.log4j + log4j-slf4j-impl + @@ -308,6 +322,10 @@ com.google.code.findbugs annotations + + org.apache.zookeeper + zookeeper + @@ -464,14 +482,14 @@ org.apache.commons commons-lang3 - 3.8 + 3.11 runtime org.apache.httpcomponents httpcore - 4.4.11 + 4.4.13 runtime @@ -568,6 +586,12 @@ io.confluent kafka-avro-serializer test + + + org.apache.zookeeper + zookeeper + + @@ -635,6 +659,66 @@ + + org.basepom.maven + duplicate-finder-maven-plugin + + + + io.netty + netty-transport-classes-epoll + + + io.netty + netty-transport-native-epoll + + + jakarta.validation + jakarta.validation-api + + + javax.validation + validation-api + + + org.apache.helix + helix-common + + + org.apache.helix + helix-core + + + org.apache.helix + metrics-common + + + org.apache.helix + metadata-store-directory-common + + + org.apache.helix + zookeeper-api + + + commons-logging + commons-logging + + + org.slf4j + jcl-over-slf4j + + + org.apache.logging.log4j + log4j-slf4j-impl:2.17.1 + + + org.slf4j + slf4j-jdk14 + + + + diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java index b68e851d90f6..31d2482101bd 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java @@ -21,6 +21,7 @@ import io.trino.plugin.pinot.PinotException; import io.trino.plugin.pinot.PinotSplit; import io.trino.spi.connector.ConnectorSession; +import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.proto.Server; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.utils.grpc.GrpcQueryClient; @@ -43,9 +44,9 @@ import java.util.concurrent.ConcurrentHashMap; import static java.util.Objects.requireNonNull; -import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE; -import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.CONFIG_USE_PLAIN_TEXT; -import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.GRPC_TLS_PREFIX; +import static org.apache.pinot.common.config.GrpcConfig.CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE; +import static org.apache.pinot.common.config.GrpcConfig.CONFIG_USE_PLAIN_TEXT; +import static org.apache.pinot.common.config.GrpcConfig.GRPC_TLS_PREFIX; public class PinotGrpcDataFetcher implements PinotDataFetcher @@ -154,13 +155,13 @@ public interface GrpcQueryClientFactory public static class PlainTextGrpcQueryClientFactory implements GrpcQueryClientFactory { - private final GrpcQueryClient.Config config; + private final GrpcConfig config; @Inject public PlainTextGrpcQueryClientFactory(PinotGrpcServerQueryClientConfig grpcClientConfig) { requireNonNull(grpcClientConfig, "grpcClientConfig is null"); - this.config = new GrpcQueryClient.Config(ImmutableMap.builder() + this.config = new GrpcConfig(ImmutableMap.builder() .put(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, String.valueOf(grpcClientConfig.getMaxInboundMessageSize().toBytes())) .put(CONFIG_USE_PLAIN_TEXT, String.valueOf(grpcClientConfig.isUsePlainText())) .buildOrThrow()); @@ -185,7 +186,7 @@ public static class TlsGrpcQueryClientFactory private static final String TRUSTSTORE_PASSWORD = GRPC_TLS_PREFIX + "." + "truststore.password"; private static final String SSL_PROVIDER = GRPC_TLS_PREFIX + "." + "ssl.provider"; - private final GrpcQueryClient.Config config; + private final GrpcConfig config; @Inject public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientTlsConfig tlsConfig) @@ -203,7 +204,7 @@ public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientTlsConfig tlsConfig) } tlsConfigBuilder.put(SSL_PROVIDER, tlsConfig.getSslProvider()); - this.config = new GrpcQueryClient.Config(tlsConfigBuilder.buildOrThrow()); + this.config = new GrpcConfig(tlsConfigBuilder.buildOrThrow()); } @Override @@ -215,8 +216,6 @@ public GrpcQueryClient create(HostAndPort hostAndPort) public static class PinotGrpcServerQueryClient { - private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler(); - private final PinotHostMapper pinotHostMapper; private final Map clientCache = new ConcurrentHashMap<>(); private final int grpcPort; @@ -241,7 +240,7 @@ public Iterator queryPinot(ConnectorSession session, Str closer.register(queryClient::close); return queryClient; }); - BrokerRequest brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(query); + BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); GrpcRequestBuilder requestBuilder = new GrpcRequestBuilder() .setSql(query) .setSegments(segments) diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientConfig.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientConfig.java index df0c1739a613..b3e1294f1fb5 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientConfig.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientConfig.java @@ -16,7 +16,7 @@ import io.airlift.configuration.Config; import io.airlift.units.DataSize; -import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE; +import static org.apache.pinot.common.config.GrpcConfig.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE; public class PinotGrpcServerQueryClientConfig { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotLegacyDataFetcher.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotLegacyDataFetcher.java index 1655c398fefd..c2166e6209b4 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotLegacyDataFetcher.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotLegacyDataFetcher.java @@ -22,7 +22,6 @@ import io.trino.plugin.pinot.PinotSplit; import io.trino.spi.connector.ConnectorSession; import org.apache.pinot.common.metrics.BrokerMetrics; -import org.apache.pinot.common.metrics.PinotMetricUtils; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataTable; @@ -31,6 +30,7 @@ import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.ServerResponse; import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; @@ -172,7 +172,6 @@ public int getRowLimit() public static class PinotLegacyServerQueryClient { - private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler(); private static final String TRINO_HOST_PREFIX = "trino-pinot-master"; private final String trinoHostId; @@ -212,7 +211,7 @@ public Iterator queryPinot(ConnectorSession session, Str // TODO: separate into offline and realtime methods BrokerRequest brokerRequest; try { - brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(query); + brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); } catch (SqlCompilationException e) { throw new PinotException(PINOT_INVALID_PQL_GENERATED, Optional.of(query), format("Parsing error with on %s, Error = %s", serverHost, e.getMessage()), e); @@ -229,7 +228,7 @@ public Iterator queryPinot(ConnectorSession session, Str AsyncQueryResponse asyncQueryResponse = doWithRetries(pinotRetryCount, requestId -> queryRouter.submitQuery(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, connectionTimeoutInMillis)); try { - Map response = asyncQueryResponse.getResponse(); + Map response = asyncQueryResponse.getFinalResponses(); ImmutableList.Builder pinotDataTableWithSizeBuilder = ImmutableList.builder(); for (Map.Entry entry : response.entrySet()) { ServerResponse serverResponse = entry.getValue(); diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java index d79a778b9b1a..af078e82e405 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java @@ -33,7 +33,7 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.reduce.PostAggregationHandler; import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; @@ -62,7 +62,6 @@ public final class DynamicTableBuilder { - private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler(); public static final String OFFLINE_SUFFIX = "_OFFLINE"; public static final String REALTIME_SUFFIX = "_REALTIME"; private static final Set NON_NULL_ON_EMPTY_AGGREGATIONS = EnumSet.of(COUNT, DISTINCTCOUNT, DISTINCTCOUNTHLL); @@ -77,9 +76,10 @@ public static DynamicTable buildFromPql(PinotMetadata pinotMetadata, SchemaTable requireNonNull(schemaTableName, "schemaTableName is null"); requireNonNull(typeConverter, "typeConverter is null"); String query = schemaTableName.getTableName(); - BrokerRequest request = REQUEST_COMPILER.compileToBrokerRequest(query); + BrokerRequest request = CalciteSqlCompiler.compileToBrokerRequest(query); PinotQuery pinotQuery = request.getPinotQuery(); - QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(request); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); + String tableName = request.getQuerySource().getTableName(); String trinoTableName = stripSuffix(tableName).toLowerCase(ENGLISH); String pinotTableName = pinotClient.getPinotTableNameFromTrinoTableName(trinoTableName); diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotExpressionRewriter.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotExpressionRewriter.java index 9ed9abd33ac8..596724ab5822 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotExpressionRewriter.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotExpressionRewriter.java @@ -23,8 +23,10 @@ import org.apache.pinot.common.function.TransformFunctionType; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.core.operator.transform.transformer.datetime.BaseDateTimeTransformer; +import org.apache.pinot.core.operator.transform.transformer.datetime.DateTimeTransformerFactory; +import org.apache.pinot.core.operator.transform.transformer.datetime.EpochToEpochTransformer; import org.apache.pinot.segment.spi.AggregationFunctionType; -import org.apache.pinot.spi.data.DateTimeFormatSpec; import java.util.HashMap; import java.util.Iterator; @@ -65,8 +67,6 @@ import static org.apache.pinot.core.operator.transform.transformer.timeunit.TimeUnitTransformerFactory.getTimeUnitTransformer; import static org.apache.pinot.segment.spi.AggregationFunctionType.COUNT; import static org.apache.pinot.segment.spi.AggregationFunctionType.getAggregationFunctionType; -import static org.apache.pinot.spi.data.DateTimeFormatSpec.validateFormat; -import static org.apache.pinot.spi.data.DateTimeGranularitySpec.validateGranularity; public class PinotExpressionRewriter { @@ -177,13 +177,13 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex ImmutableList.Builder argumentsBuilder = ImmutableList.builder(); argumentsBuilder.add(rewriteExpression(object.getArguments().get(0), context)); String inputFormat = object.getArguments().get(1).getLiteral().toUpperCase(ENGLISH); - checkDateTimeFormatSpec(inputFormat); argumentsBuilder.add(forLiteral(inputFormat)); String outputFormat = object.getArguments().get(2).getLiteral().toUpperCase(ENGLISH); - checkDateTimeFormatSpec(outputFormat); argumentsBuilder.add(forLiteral(outputFormat)); String granularity = object.getArguments().get(3).getLiteral().toUpperCase(ENGLISH); - validateGranularity(granularity); + BaseDateTimeTransformer dateTimeTransformer = DateTimeTransformerFactory.getDateTimeTransformer(inputFormat, outputFormat, granularity); + // Even if the format is valid, make sure it is not a simple date format: format characters can be ambiguous due to lower casing + checkState(dateTimeTransformer instanceof EpochToEpochTransformer, "Unsupported date format: simple date format not supported"); argumentsBuilder.add(forLiteral(granularity)); return new FunctionContext(object.getType(), object.getFunctionName(), argumentsBuilder.build()); } @@ -306,15 +306,6 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex } } - private static void checkDateTimeFormatSpec(String dateTimeFormat) - { - requireNonNull(dateTimeFormat, "dateTimeFormat is null"); - validateFormat(dateTimeFormat); - // Even if the format is valid, make sure it is not a simple date format: format characters can be ambiguous due to lower casing - DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(dateTimeFormat); - checkState(dateTimeFormatSpec.getSDFPattern() == null, "Unsupported date format: simple date format not supported"); - } - private static void verifyIsIdentifierOrFunction(ExpressionContext expressionContext) { verify(expressionContext.getType() == IDENTIFIER || expressionContext.getType() == FUNCTION); diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotPatterns.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotPatterns.java index 3e16c1108664..93240aa355a0 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotPatterns.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotPatterns.java @@ -241,6 +241,16 @@ public static Pattern binaryFunction() }); } + public static Property transformFunctionName() + { + return Property.optionalProperty("transformFunctionType", functionContext -> { + if (functionContext.getType() == TRANSFORM) { + return Optional.of(functionContext.getFunctionName()); + } + return Optional.empty(); + }); + } + // AggregationFunction Properties public static Property aggregationFunctionType() { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotSqlFormatter.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotSqlFormatter.java index a669a8107772..87501e70dd0e 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotSqlFormatter.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotSqlFormatter.java @@ -69,13 +69,13 @@ import static io.trino.plugin.pinot.query.PinotPatterns.secondArgument; import static io.trino.plugin.pinot.query.PinotPatterns.singleInput; import static io.trino.plugin.pinot.query.PinotPatterns.transformFunction; +import static io.trino.plugin.pinot.query.PinotPatterns.transformFunctionName; import static io.trino.plugin.pinot.query.PinotPatterns.transformFunctionType; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.joining; import static org.apache.pinot.common.function.TransformFunctionType.CASE; import static org.apache.pinot.common.function.TransformFunctionType.CAST; -import static org.apache.pinot.common.function.TransformFunctionType.MINUS; import static org.apache.pinot.common.request.context.ExpressionContext.Type.IDENTIFIER; import static org.apache.pinot.common.request.context.predicate.RangePredicate.UNBOUNDED; import static org.apache.pinot.segment.spi.AggregationFunctionType.COUNT; @@ -83,6 +83,8 @@ public class PinotSqlFormatter { + private static final String MINUS = "minus"; + private static final List> FILTER_RULES = ImmutableList.>builder() .add(new AndOrFilterRule()) .add(new PredicateFilterRule()) @@ -93,6 +95,10 @@ public class PinotSqlFormatter .add(new BinaryOperatorPredicateRule()) .build(); + private static final List> GLOBAL_FUNCTION_RULES = ImmutableList.>builder() + .add(new MinusFunctionRule()) + .build(); + private static final Map> PREDICATE_RULE_MAP; private static final Map> FUNCTION_RULE_MAP; private static final Map> AGGREGATION_FUNCTION_RULE_MAP; @@ -113,7 +119,6 @@ public class PinotSqlFormatter Map> functionMap = new HashMap<>(); functionMap.put(CASE, new CaseFunctionRule()); functionMap.put(CAST, new CastFunctionRule()); - functionMap.put(MINUS, new MinusFunctionRule()); FUNCTION_RULE_MAP = immutableEnumMap(functionMap); Map> aggregationFunctionMap = new HashMap<>(); @@ -216,6 +221,9 @@ private static String formatFunction(FunctionContext functionContext, Context co if (rule != null) { result = applyRule(rule, functionContext, context); } + else { + result = applyRules(GLOBAL_FUNCTION_RULES, functionContext, context); + } } else { checkState(functionContext.getType() == FunctionContext.Type.AGGREGATION, "Unexpected function type for '%s'", functionContext); @@ -364,7 +372,7 @@ private static class MinusZeroPredicateRule .with(functionContext().matching(binaryFunction() .with(firstArgument().capturedAs(FIRST_ARGUMENT)) .with(secondArgument().capturedAs(SECOND_ARGUMENT)) - .with(transformFunctionType().equalTo(MINUS)))))); + .with(transformFunctionName().matching(MINUS::equalsIgnoreCase)))))); @Override public Pattern getPattern() @@ -549,7 +557,7 @@ private static class MinusFunctionRule private static final Capture FIRST_ARGUMENT = newCapture(); private static final Capture SECOND_ARGUMENT = newCapture(); private static final Pattern PATTERN = binaryFunction() - .with(transformFunctionType().equalTo(MINUS)) + .with(transformFunctionName().matching(MINUS::equalsIgnoreCase)) .with(firstArgument().capturedAs(FIRST_ARGUMENT)) .with(secondArgument().capturedAs(SECOND_ARGUMENT)); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java index 528ef7137719..180868d4be17 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java @@ -83,6 +83,7 @@ import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; import static io.trino.plugin.pinot.PinotQueryRunner.createPinotQueryRunner; +import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_LATEST_IMAGE_NAME; import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_PREVIOUS_IMAGE_NAME; import static io.trino.spi.type.DoubleType.DOUBLE; import static io.trino.spi.type.RealType.REAL; @@ -139,6 +140,11 @@ protected String getPinotImageName() return PINOT_PREVIOUS_IMAGE_NAME; } + protected boolean isLatestVersion() + { + return getPinotImageName().equals(PINOT_LATEST_IMAGE_NAME); + } + @Override protected QueryRunner createQueryRunner() throws Exception @@ -300,9 +306,13 @@ private void createAndPopulateTooManyRowsTable(TestingKafka kafka, TestingPinotC .set("updatedAt", initialUpdatedAt.plusMillis(i * 1000).toEpochMilli()) .build())); } - // Add a null row, verify it was not ingested as pinot does not accept null time column values. - // The data is verified in testBrokerQueryWithTooManyRowsForSegmentQuery - tooManyRowsRecordsBuilder.add(new ProducerRecord<>(TOO_MANY_ROWS_TABLE, "key" + MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES, new GenericRecordBuilder(tooManyRowsAvroSchema).build())); + // For pinot 0.11.0+: rows with null time column values are ingested + // Only add a null row with a null time column for pinot < 0.11.0 + if (!isLatestVersion()) { + // Add a null row, verify it was not ingested as pinot does not accept null time column values. + // The data is verified in testBrokerQueryWithTooManyRowsForSegmentQuery + tooManyRowsRecordsBuilder.add(new ProducerRecord<>(TOO_MANY_ROWS_TABLE, "key" + MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES, new GenericRecordBuilder(tooManyRowsAvroSchema).build())); + } kafka.sendMessages(tooManyRowsRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka)); pinot.createSchema(getClass().getClassLoader().getResourceAsStream("too_many_rows_schema.json"), TOO_MANY_ROWS_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("too_many_rows_realtimeSpec.json"), TOO_MANY_ROWS_TABLE); @@ -700,7 +710,7 @@ protected Map additionalPinotProperties() private static Path createSegment(InputStream tableConfigInputStream, InputStream pinotSchemaInputStream, RecordReader recordReader, String outputDirectory, int sequenceId) { try { - org.apache.pinot.spi.data.Schema pinotSchema = org.apache.pinot.spi.data.Schema.fromInputSteam(pinotSchemaInputStream); + org.apache.pinot.spi.data.Schema pinotSchema = org.apache.pinot.spi.data.Schema.fromInputStream(pinotSchemaInputStream); TableConfig tableConfig = inputStreamToObject(tableConfigInputStream, TableConfig.class); String tableName = TableNameBuilder.extractRawTableName(tableConfig.getTableName()); String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java index 49cc090332ff..028059cc133f 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java @@ -143,7 +143,7 @@ public void testFilterWithCast() String query = "SELECT string_col, long_col" + " FROM " + tableName + " WHERE string_col = CAST(123 AS STRING) AND long_col = CAST('123' AS LONG) LIMIT 60"; String expected = "select \"string_col\", \"long_col\" from primitive_types_table " + - "where AND((\"string_col\") = (CAST('123' AS string)), (\"long_col\") = (CAST('123' AS long))) limit 60"; + "where AND((\"string_col\") = '123', (\"long_col\") = '123') limit 60"; DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); assertEquals(extractPql(dynamicTable, TupleDomain.all()), expected); } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientConfig.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientConfig.java index 4375d828a076..9b09ba2e29b6 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientConfig.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientConfig.java @@ -21,7 +21,7 @@ import java.util.Map; -import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE; +import static org.apache.pinot.common.config.GrpcConfig.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE; public class TestPinotGrpcServerQueryClientConfig { diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorSmokeTest.java similarity index 96% rename from plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest.java rename to plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorSmokeTest.java index 35795c92c566..6555cdbe69b1 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorSmokeTest.java @@ -15,7 +15,7 @@ import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_LATEST_IMAGE_NAME; -public class TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest +public class TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorSmokeTest extends BasePinotIntegrationConnectorSmokeTest { @Override diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java index 4732955ee2aa..f133fef2e843 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java @@ -59,15 +59,15 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static javax.ws.rs.core.MediaType.APPLICATION_JSON; -import static org.apache.pinot.common.utils.FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS; +import static org.apache.pinot.common.utils.http.HttpClient.DEFAULT_SOCKET_TIMEOUT_MS; import static org.testcontainers.containers.KafkaContainer.ZOOKEEPER_PORT; import static org.testcontainers.utility.DockerImageName.parse; public class TestingPinotCluster implements Closeable { - public static final String PINOT_LATEST_IMAGE_NAME = "apachepinot/pinot:0.10.0"; - public static final String PINOT_PREVIOUS_IMAGE_NAME = "apachepinot/pinot:0.9.3-jdk11"; + public static final String PINOT_LATEST_IMAGE_NAME = "apachepinot/pinot:0.11.0"; + public static final String PINOT_PREVIOUS_IMAGE_NAME = "apachepinot/pinot:0.10.0"; private static final String ZOOKEEPER_INTERNAL_HOST = "zookeeper"; private static final JsonCodec> LIST_JSON_CODEC = listJsonCodec(String.class); @@ -220,7 +220,7 @@ public void addRealTimeTable(InputStream realTimeSpec, String tableName) PinotSuccessResponse response = doWithRetries(() -> httpClient.execute(request, createJsonResponseHandler(PINOT_SUCCESS_RESPONSE_JSON_CODEC)), 10); // Typo in response: https://github.com/apache/incubator-pinot/issues/5566 - checkState(response.getStatus().equals(format("Table %s_REALTIME succesfully added", tableName)), "Unexpected response: '%s'", response.getStatus()); + checkState(response.getStatus().startsWith(format("Table %s_REALTIME succes", tableName)), "Unexpected response: '%s'", response.getStatus()); } public void addOfflineTable(InputStream offlineSpec, String tableName) @@ -237,7 +237,7 @@ public void addOfflineTable(InputStream offlineSpec, String tableName) PinotSuccessResponse response = doWithRetries(() -> httpClient.execute(request, createJsonResponseHandler(PINOT_SUCCESS_RESPONSE_JSON_CODEC)), 10); // Typo in response: https://github.com/apache/incubator-pinot/issues/5566 - checkState(response.getStatus().equals(format("Table %s_OFFLINE succesfully added", tableName)), "Unexpected response: '%s'", response.getStatus()); + checkState(response.getStatus().startsWith(format("Table %s_OFFLINE succes", tableName)), "Unexpected response: '%s'", response.getStatus()); } public void publishOfflineSegment(String tableName, Path segmentPath)