From bd2080f2062957cabf415734b346ab650bb0c073 Mon Sep 17 00:00:00 2001 From: Elon Azoulay Date: Sat, 30 Jul 2022 16:09:53 -0700 Subject: [PATCH 1/2] Add PinotToTrinoTypeConverter --- plugin/trino-pinot/pom.xml | 7 + .../trino/plugin/pinot/PinotColumnHandle.java | 54 ------- .../io/trino/plugin/pinot/PinotMetadata.java | 10 +- .../io/trino/plugin/pinot/PinotModule.java | 1 + .../plugin/pinot/PinotTypeConverter.java | 134 ++++++++++++++++++ .../pinot/query/DynamicTableBuilder.java | 51 ++----- .../plugin/pinot/query/PinotTypeResolver.java | 60 ++------ .../trino/plugin/pinot/TestDynamicTable.java | 46 +++--- .../trino/plugin/pinot/TestPinotMetadata.java | 3 +- .../plugin/pinot/TestPinotQueryBase.java | 6 +- .../plugin/pinot/TestPinotSplitManager.java | 2 +- 11 files changed, 195 insertions(+), 179 deletions(-) create mode 100644 plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTypeConverter.java diff --git a/plugin/trino-pinot/pom.xml b/plugin/trino-pinot/pom.xml index f5a190ebe53a..145b4b00b43b 100755 --- a/plugin/trino-pinot/pom.xml +++ b/plugin/trino-pinot/pom.xml @@ -521,6 +521,13 @@ test + + io.trino + trino-spi + test-jar + test + + io.trino trino-testing diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java index 3d0b0c406b4d..28e98b631553 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java @@ -18,24 +18,13 @@ import com.google.common.collect.ImmutableMap; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; -import io.trino.spi.type.ArrayType; -import io.trino.spi.type.BigintType; -import io.trino.spi.type.BooleanType; -import io.trino.spi.type.DoubleType; -import io.trino.spi.type.IntegerType; -import io.trino.spi.type.RealType; import io.trino.spi.type.Type; -import io.trino.spi.type.VarbinaryType; -import io.trino.spi.type.VarcharType; -import org.apache.pinot.core.operator.transform.TransformResultMetadata; -import org.apache.pinot.spi.data.FieldSpec; import java.util.Objects; import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; -import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE; import static io.trino.plugin.pinot.PinotMetadata.PINOT_COLUMN_NAME_PROPERTY; import static io.trino.plugin.pinot.query.DynamicTablePqlExtractor.quoteIdentifier; import static java.lang.String.format; @@ -94,49 +83,6 @@ public static PinotColumnHandle fromColumnMetadata(ColumnMetadata columnMetadata return new PinotColumnHandle(columnName, columnMetadata.getType()); } - public static Type getTrinoTypeFromPinotType(FieldSpec field) - { - Type type = getTrinoTypeFromPinotType(field.getDataType()); - if (field.isSingleValueField()) { - return type; - } - else { - return new ArrayType(type); - } - } - - public static Type getTrinoTypeFromPinotType(TransformResultMetadata transformResultMetadata) - { - Type type = getTrinoTypeFromPinotType(transformResultMetadata.getDataType()); - if (transformResultMetadata.isSingleValue()) { - return type; - } - return new ArrayType(type); - } - - public static Type getTrinoTypeFromPinotType(FieldSpec.DataType dataType) - { - switch (dataType) { - case BOOLEAN: - return BooleanType.BOOLEAN; - case FLOAT: - return RealType.REAL; - case DOUBLE: - return DoubleType.DOUBLE; - case INT: - return IntegerType.INTEGER; - case LONG: - return BigintType.BIGINT; - case STRING: - return VarcharType.VARCHAR; - case BYTES: - return VarbinaryType.VARBINARY; - default: - break; - } - throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported type conversion for pinot data type: " + dataType); - } - @JsonProperty public String getColumnName() { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java index ac567dd7052b..34fc5e3cac01 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java @@ -76,7 +76,6 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache; import static io.trino.plugin.pinot.PinotColumnHandle.fromColumnMetadata; -import static io.trino.plugin.pinot.PinotColumnHandle.getTrinoTypeFromPinotType; import static io.trino.plugin.pinot.PinotSessionProperties.isAggregationPushdownEnabled; import static io.trino.plugin.pinot.query.AggregateExpression.replaceIdentifier; import static java.util.Locale.ENGLISH; @@ -94,16 +93,19 @@ public class PinotMetadata private final AggregateFunctionRewriter aggregateFunctionRewriter; private final ImplementCountDistinct implementCountDistinct; private final PinotClient pinotClient; + private final PinotTypeConverter typeConverter; @Inject public PinotMetadata( PinotClient pinotClient, PinotConfig pinotConfig, - @ForPinot ExecutorService executor) + @ForPinot ExecutorService executor, + PinotTypeConverter typeConverter) { requireNonNull(pinotConfig, "pinot config"); this.pinotClient = requireNonNull(pinotClient, "pinotClient is null"); long metadataCacheExpiryMillis = pinotConfig.getMetadataCacheExpiry().roundTo(TimeUnit.MILLISECONDS); + this.typeConverter = requireNonNull(typeConverter, "typeConverter is null"); this.pinotTableColumnCache = buildNonEvictableCache( CacheBuilder.newBuilder() .refreshAfterWrite(metadataCacheExpiryMillis, TimeUnit.MILLISECONDS), @@ -143,7 +145,7 @@ public List listSchemaNames(ConnectorSession session) public PinotTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { if (tableName.getTableName().trim().startsWith("select ")) { - DynamicTable dynamicTable = DynamicTableBuilder.buildFromPql(this, tableName, pinotClient); + DynamicTable dynamicTable = DynamicTableBuilder.buildFromPql(this, tableName, pinotClient, typeConverter); return new PinotTableHandle(tableName.getSchemaName(), dynamicTable.getTableName(), TupleDomain.all(), OptionalLong.empty(), Optional.of(dynamicTable)); } String pinotTableName = pinotClient.getPinotTableNameFromTrinoTableNameIfExists(tableName.getTableName()); @@ -517,7 +519,7 @@ private List getPinotColumnMetadataForPinotSchema(Schema pinotTa .filter(columnName -> !columnName.startsWith("$")) // Hidden columns starts with "$", ignore them as we can't use them in PQL .map(columnName -> ColumnMetadata.builder() .setName(columnName) - .setType(getTrinoTypeFromPinotType(pinotTableSchema.getFieldSpecFor(columnName))) + .setType(typeConverter.toTrinoType(pinotTableSchema.getFieldSpecFor(columnName))) .setProperties(ImmutableMap.builder() .put(PINOT_COLUMN_NAME_PROPERTY, columnName) .buildOrThrow()) diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java index e7709f8ba119..096c1494b405 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java @@ -77,6 +77,7 @@ public void setup(Binder binder) binder.bind(PinotSplitManager.class).in(Scopes.SINGLETON); binder.bind(PinotPageSourceProvider.class).in(Scopes.SINGLETON); binder.bind(PinotClient.class).in(Scopes.SINGLETON); + binder.bind(PinotTypeConverter.class).in(Scopes.SINGLETON); binder.bind(ExecutorService.class).annotatedWith(ForPinot.class) .toInstance(newCachedThreadPool(threadsNamed("pinot-metadata-fetcher-" + catalogName))); diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTypeConverter.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTypeConverter.java new file mode 100644 index 000000000000..bfa7c853b11e --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTypeConverter.java @@ -0,0 +1,134 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot; + +import com.google.common.base.Suppliers; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.BigintType; +import io.trino.spi.type.BooleanType; +import io.trino.spi.type.DoubleType; +import io.trino.spi.type.IntegerType; +import io.trino.spi.type.RealType; +import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; +import io.trino.spi.type.TypeSignature; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.spi.data.FieldSpec; + +import javax.inject.Inject; + +import java.util.Optional; +import java.util.function.Supplier; + +import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.RealType.REAL; +import static io.trino.spi.type.StandardTypes.JSON; +import static io.trino.spi.type.VarbinaryType.VARBINARY; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public class PinotTypeConverter +{ + // Supplier is used for compatibility unit tests using TestingTypeManager. + // TestingTypeManager does not support json type. + private final Supplier jsonTypeSupplier; + + @Inject + public PinotTypeConverter(TypeManager typeManager) + { + requireNonNull(typeManager, "typeManager is null"); + this.jsonTypeSupplier = Suppliers.memoize(() -> typeManager.getType(new TypeSignature(JSON))); + } + + public Type toTrinoType(FieldSpec field) + { + return toTrinoType(field.getDataType(), field.isSingleValueField()); + } + + public Type toTrinoType(TransformResultMetadata transformResultMetadata) + { + return toTrinoType(transformResultMetadata.getDataType(), transformResultMetadata.isSingleValue()); + } + + private Type toTrinoType(FieldSpec.DataType dataType, boolean isSingleValue) + { + Type type = toTrinoType(dataType); + if (isSingleValue) { + return type; + } + return new ArrayType(type); + } + + private Type toTrinoType(FieldSpec.DataType dataType) + { + switch (dataType) { + case BOOLEAN: + return BooleanType.BOOLEAN; + case FLOAT: + return RealType.REAL; + case DOUBLE: + return DoubleType.DOUBLE; + case INT: + return IntegerType.INTEGER; + case LONG: + return BigintType.BIGINT; + case STRING: + return VarcharType.VARCHAR; + case JSON: + return jsonTypeSupplier.get(); + case BYTES: + return VarbinaryType.VARBINARY; + default: + break; + } + throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported type conversion for pinot data type: " + dataType); + } + + public Type toTrinoType(DataSchema.ColumnDataType columnDataType) + { + switch (columnDataType) { + case INT: + return INTEGER; + case LONG: + return BIGINT; + case FLOAT: + return REAL; + case DOUBLE: + return DOUBLE; + case STRING: + return VARCHAR; + case JSON: + return jsonTypeSupplier.get(); + case BYTES: + return VARBINARY; + case INT_ARRAY: + return new ArrayType(INTEGER); + case LONG_ARRAY: + return new ArrayType(BIGINT); + case DOUBLE_ARRAY: + return new ArrayType(DOUBLE); + case STRING_ARRAY: + return new ArrayType(VARCHAR); + default: + break; + } + throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported column data type: " + columnDataType); + } +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java index 18c444f82876..848ab072ce2c 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java @@ -18,6 +18,7 @@ import io.trino.plugin.pinot.PinotColumnHandle; import io.trino.plugin.pinot.PinotException; import io.trino.plugin.pinot.PinotMetadata; +import io.trino.plugin.pinot.PinotTypeConverter; import io.trino.plugin.pinot.client.PinotClient; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.SchemaTableName; @@ -46,19 +47,11 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.pinot.PinotColumnHandle.fromNonAggregateColumnHandle; -import static io.trino.plugin.pinot.PinotColumnHandle.getTrinoTypeFromPinotType; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION; -import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE; import static io.trino.plugin.pinot.query.PinotExpressionRewriter.rewriteExpression; import static io.trino.plugin.pinot.query.PinotPatterns.WILDCARD; import static io.trino.plugin.pinot.query.PinotSqlFormatter.formatExpression; import static io.trino.plugin.pinot.query.PinotSqlFormatter.formatFilter; -import static io.trino.spi.type.BigintType.BIGINT; -import static io.trino.spi.type.DoubleType.DOUBLE; -import static io.trino.spi.type.IntegerType.INTEGER; -import static io.trino.spi.type.RealType.REAL; -import static io.trino.spi.type.VarbinaryType.VARBINARY; -import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -78,10 +71,11 @@ private DynamicTableBuilder() { } - public static DynamicTable buildFromPql(PinotMetadata pinotMetadata, SchemaTableName schemaTableName, PinotClient pinotClient) + public static DynamicTable buildFromPql(PinotMetadata pinotMetadata, SchemaTableName schemaTableName, PinotClient pinotClient, PinotTypeConverter typeConverter) { requireNonNull(pinotMetadata, "pinotMetadata is null"); requireNonNull(schemaTableName, "schemaTableName is null"); + requireNonNull(typeConverter, "typeConverter is null"); String query = schemaTableName.getTableName(); BrokerRequest request = REQUEST_COMPILER.compileToBrokerRequest(query); PinotQuery pinotQuery = request.getPinotQuery(); @@ -93,13 +87,13 @@ public static DynamicTable buildFromPql(PinotMetadata pinotMetadata, SchemaTable Map columnHandles = pinotMetadata.getPinotColumnHandles(trinoTableName); List orderBy = ImmutableList.of(); - PinotTypeResolver pinotTypeResolver = new PinotTypeResolver(pinotClient, pinotTableName); + PinotTypeResolver pinotTypeResolver = new PinotTypeResolver(pinotClient, typeConverter, pinotTableName); List selectColumns = ImmutableList.of(); Map aggregateTypes = ImmutableMap.of(); if (queryContext.getAggregationFunctions() != null) { checkState(queryContext.getAggregationFunctions().length > 0, "Aggregation Functions is empty"); - aggregateTypes = getAggregateTypes(schemaTableName, queryContext, columnHandles); + aggregateTypes = getAggregateTypes(schemaTableName, queryContext, columnHandles, typeConverter); } if (queryContext.getSelectExpressions() != null) { @@ -131,35 +125,6 @@ public static DynamicTable buildFromPql(PinotMetadata pinotMetadata, SchemaTable return new DynamicTable(pinotTableName, suffix, selectColumns, filter, groupByColumns, ImmutableList.of(), orderBy, OptionalLong.of(queryContext.getLimit()), getOffset(queryContext), query); } - private static Type toTrinoType(DataSchema.ColumnDataType columnDataType) - { - switch (columnDataType) { - case INT: - return INTEGER; - case LONG: - return BIGINT; - case FLOAT: - return REAL; - case DOUBLE: - return DOUBLE; - case STRING: - return VARCHAR; - case BYTES: - return VARBINARY; - case INT_ARRAY: - return new ArrayType(INTEGER); - case LONG_ARRAY: - return new ArrayType(BIGINT); - case DOUBLE_ARRAY: - return new ArrayType(DOUBLE); - case STRING_ARRAY: - return new ArrayType(VARCHAR); - default: - break; - } - throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported column data type: " + columnDataType); - } - private static List getPinotColumns(SchemaTableName schemaTableName, List expressions, List aliases, Map columnHandles, PinotTypeResolver pinotTypeResolver, Map aggregateTypes) { ImmutableList.Builder pinotColumnsBuilder = ImmutableList.builder(); @@ -194,7 +159,7 @@ private static PinotColumnHandle getPinotColumnHandle(SchemaTableName schemaTabl columnName = aggregateTypes.get(columnName).getPinotColumnName(); } else { - trinoType = getTrinoTypeFromPinotType(pinotTypeResolver.resolveExpressionType(rewritten, schemaTableName, columnHandles)); + trinoType = pinotTypeResolver.resolveExpressionType(rewritten, schemaTableName, columnHandles); if (!aggregateTypes.isEmpty() && trinoType instanceof ArrayType) { trinoType = ((ArrayType) trinoType).getElementType(); } @@ -237,7 +202,7 @@ private static boolean hasAggregate(ExpressionContext expressionContext) throw new PinotException(PINOT_EXCEPTION, Optional.empty(), format("Unsupported expression type '%s'", expressionContext.getType())); } - private static Map getAggregateTypes(SchemaTableName schemaTableName, QueryContext queryContext, Map columnHandles) + private static Map getAggregateTypes(SchemaTableName schemaTableName, QueryContext queryContext, Map columnHandles, PinotTypeConverter typeConverter) { // A mapping from pinot expression to the returned pinot column name and trino type // Note: the column name is set by the PostAggregationHandler @@ -261,7 +226,7 @@ private static Map getAggregateTypes(Schema columnHandles).toString(), new PinotColumnNameAndTrinoType( postAggregationSchema.getColumnName(index), - toTrinoType(postAggregationSchema.getColumnDataType(index)))); + typeConverter.toTrinoType(postAggregationSchema.getColumnDataType(index)))); } return aggregationTypesBuilder.buildOrThrow(); } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotTypeResolver.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotTypeResolver.java index 2600a3d7a926..acc0d3e828ab 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotTypeResolver.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotTypeResolver.java @@ -15,19 +15,12 @@ import io.trino.plugin.pinot.PinotColumnHandle; import io.trino.plugin.pinot.PinotException; +import io.trino.plugin.pinot.PinotTypeConverter; import io.trino.plugin.pinot.client.PinotClient; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnNotFoundException; import io.trino.spi.connector.SchemaTableName; -import io.trino.spi.type.ArrayType; -import io.trino.spi.type.BigintType; -import io.trino.spi.type.BooleanType; -import io.trino.spi.type.DoubleType; -import io.trino.spi.type.IntegerType; -import io.trino.spi.type.RealType; import io.trino.spi.type.Type; -import io.trino.spi.type.VarbinaryType; -import io.trino.spi.type.VarcharType; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.core.operator.transform.function.LiteralTransformFunction; @@ -42,18 +35,19 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_INVALID_PQL_GENERATED; -import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; public class PinotTypeResolver { + private final PinotTypeConverter typeConverter; private final Map datasourceMap; - public PinotTypeResolver(PinotClient pinotClient, String pinotTableName) + public PinotTypeResolver(PinotClient pinotClient, PinotTypeConverter typeConverter, String pinotTableName) { requireNonNull(pinotClient, "pinotClient is null"); + this.typeConverter = requireNonNull(typeConverter, "typeConverter is null"); this.datasourceMap = getDataSourceMap(pinotClient, pinotTableName); } @@ -71,7 +65,7 @@ private static Map getDataSourceMap(PinotClient pinotClient, } } - public TransformResultMetadata resolveExpressionType(ExpressionContext expression, SchemaTableName schemaTableName, Map columnHandles) + public Type resolveExpressionType(ExpressionContext expression, SchemaTableName schemaTableName, Map columnHandles) { switch (expression.getType()) { case IDENTIFIER: @@ -79,52 +73,14 @@ public TransformResultMetadata resolveExpressionType(ExpressionContext expressio if (columnHandle == null) { throw new ColumnNotFoundException(schemaTableName, expression.getIdentifier()); } - return fromTrinoType(columnHandle.getDataType()); + return columnHandle.getDataType(); case FUNCTION: - return TransformFunctionFactory.get(expression, datasourceMap).getResultMetadata(); + return typeConverter.toTrinoType(TransformFunctionFactory.get(expression, datasourceMap).getResultMetadata()); case LITERAL: FieldSpec.DataType literalDataType = new LiteralTransformFunction(expression.getLiteral()).getResultMetadata().getDataType(); - return new TransformResultMetadata(literalDataType, true, false); + return typeConverter.toTrinoType(new TransformResultMetadata(literalDataType, true, false)); default: throw new PinotException(PINOT_INVALID_PQL_GENERATED, Optional.empty(), format("Unsupported expression: '%s'", expression)); } } - - public static TransformResultMetadata fromTrinoType(Type type) - { - if (type instanceof ArrayType) { - ArrayType arrayType = (ArrayType) type; - Type elementType = arrayType.getElementType(); - return new TransformResultMetadata(fromPrimitiveTrinoType(elementType), false, false); - } - else { - return new TransformResultMetadata(fromPrimitiveTrinoType(type), true, false); - } - } - - private static FieldSpec.DataType fromPrimitiveTrinoType(Type type) - { - if (type instanceof VarcharType) { - return FieldSpec.DataType.STRING; - } - if (type instanceof BigintType) { - return FieldSpec.DataType.LONG; - } - if (type instanceof IntegerType) { - return FieldSpec.DataType.INT; - } - if (type instanceof DoubleType) { - return FieldSpec.DataType.DOUBLE; - } - if (type instanceof RealType) { - return FieldSpec.DataType.FLOAT; - } - if (type instanceof BooleanType) { - return FieldSpec.DataType.BOOLEAN; - } - if (type instanceof VarbinaryType) { - return FieldSpec.DataType.BYTES; - } - throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported column data type: " + type); - } } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java index f1a359122a4b..772ace2f98e7 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java @@ -61,7 +61,7 @@ public void testSelectNoFilter() orderByColumns.stream() .collect(joining(", ")) + " desc", limit); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); assertEquals(dynamicTable.getProjections().stream() .map(PinotColumnHandle::getColumnName) .collect(toImmutableList()), @@ -77,7 +77,7 @@ public void testGroupBy() String tableName = realtimeOnlyTable.getTableName(); long limit = 25; String query = format("SELECT Origin, AirlineID, max(CarrierDelay), avg(CarrierDelay) FROM %s GROUP BY Origin, AirlineID LIMIT %s", tableName, limit); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); assertEquals(dynamicTable.getGroupingColumns().stream() .map(PinotColumnHandle::getColumnName) .collect(toImmutableList()), @@ -102,7 +102,7 @@ public void testFilter() "AND((\"OriginCityName\") != 'catfish paradise', (\"OriginState\") != 'az', (\"AirTime\") BETWEEN '1' AND '5', \"AirTime\" NOT IN ('7', '8', '9')), " + "AND((\"DepDelayMinutes\") < '10', (\"Distance\") >= '3', (\"ArrDelay\") > '4', (\"SecurityDelay\") < '5', (\"LateAircraftDelay\") <= '7')) limit 60", tableName); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expected); } @@ -121,7 +121,7 @@ public void testPrimitiveTypes() String expected = "select \"string_col\", \"long_col\", \"int_col\", \"bool_col\", \"double_col\", \"float_col\", \"bytes_col\"" + " from primitive_types_table where AND((\"string_col\") = 'string', (\"long_col\") = '12345678901'," + " (\"int_col\") = '123456789', (\"double_col\") = '3.56', (\"float_col\") = '3.56', (\"bytes_col\") = 'abcd') limit 60"; - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expected); } @@ -132,7 +132,7 @@ public void testDoubleWithScientificNotation() String tableName = "primitive_types_table"; String query = "SELECT string_col FROM " + tableName + " WHERE double_col = 3.5E5"; String expected = "select \"string_col\" from primitive_types_table where (\"double_col\") = '350000.0' limit 10"; - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expected); } @@ -144,7 +144,7 @@ public void testFilterWithCast() " FROM " + tableName + " WHERE string_col = CAST(123 AS STRING) AND long_col = CAST('123' AS LONG) LIMIT 60"; String expected = "select \"string_col\", \"long_col\" from primitive_types_table " + "where AND((\"string_col\") = (CAST('123' AS string)), (\"long_col\") = (CAST('123' AS long))) limit 60"; - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expected); } @@ -161,7 +161,7 @@ public void testFilterWithCaseStatements() "THEN 'pizza' WHEN equals(\"OriginCityName\", 'la') THEN 'burrito' WHEN equals(\"OriginCityName\", 'boston') " + "THEN 'clam chowder' ELSE 'burger' END) != 'salad') limit 10", tableName); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expected); } @@ -170,7 +170,7 @@ public void testFilterWithPushdownConstraint() { String tableName = realtimeOnlyTable.getTableName(); String query = format("select FlightNum from %s limit 60", tableName.toLowerCase(ENGLISH)); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); PinotColumnHandle columnHandle = new PinotColumnHandle("OriginCityName", VARCHAR); TupleDomain tupleDomain = TupleDomain.withColumnDomains(ImmutableMap.builder() .put(columnHandle, @@ -187,7 +187,7 @@ public void testFilterWithUdf() { String tableName = realtimeOnlyTable.getTableName(); String query = format("select FlightNum from %s where DivLongestGTimes = FLOOR(EXP(2 * LN(3))) AND 5 < EXP(CarrierDelay) limit 60", tableName.toLowerCase(ENGLISH)); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = "select \"FlightNum\" from realtimeOnly where AND((\"DivLongestGTimes\") = '9.0', (exp(\"CarrierDelay\")) > '5') limit 60"; assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); } @@ -197,7 +197,7 @@ public void testSelectStarDynamicTable() { String tableName = realtimeOnlyTable.getTableName(); String query = format("select * from %s limit 70", tableName.toLowerCase(ENGLISH)); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select %s from %s limit 70", getColumnNames(tableName).stream().map(TestDynamicTable::quoteIdentifier).collect(joining(", ")), tableName); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); } @@ -208,7 +208,7 @@ public void testOfflineDynamicTable() String tableName = hybridTable.getTableName(); String tableNameWithSuffix = tableName + OFFLINE_SUFFIX; String query = format("select * from %s limit 70", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select %s from %s limit 70", getColumnNames(tableName).stream().map(TestDynamicTable::quoteIdentifier).collect(joining(", ")), tableNameWithSuffix); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); assertEquals(dynamicTable.getTableName(), tableName); @@ -220,7 +220,7 @@ public void testRealtimeOnlyDynamicTable() String tableName = hybridTable.getTableName(); String tableNameWithSuffix = tableName + REALTIME_SUFFIX; String query = format("select * from %s limit 70", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select %s from %s limit 70", getColumnNames(tableName).stream().map(TestDynamicTable::quoteIdentifier).collect(joining(", ")), tableNameWithSuffix); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); assertEquals(dynamicTable.getTableName(), tableName); @@ -232,7 +232,7 @@ public void testLimitAndOffset() String tableName = hybridTable.getTableName(); String tableNameWithSuffix = tableName + REALTIME_SUFFIX; String query = format("select * from %s limit 70, 40", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select %s from %s limit 70, 40", getColumnNames(tableName).stream().map(TestDynamicTable::quoteIdentifier).collect(joining(", ")), tableNameWithSuffix); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); assertEquals(dynamicTable.getTableName(), tableName); @@ -249,7 +249,7 @@ public void testRegexpLike() String tableName = hybridTable.getTableName(); String tableNameWithSuffix = tableName + REALTIME_SUFFIX; String query = format("select origincityname from %s where regexp_like(origincityname, '.*york.*') limit 70", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select \"OriginCityName\" from %s where regexp_like(\"OriginCityName\", '.*york.*') limit 70", tableNameWithSuffix); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); assertEquals(dynamicTable.getTableName(), tableName); @@ -261,7 +261,7 @@ public void testTextMatch() String tableName = hybridTable.getTableName(); String tableNameWithSuffix = tableName + REALTIME_SUFFIX; String query = format("select origincityname from %s where text_match(origincityname, 'new AND york') limit 70", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select \"OriginCityName\" from %s where text_match(\"OriginCityName\", 'new and york') limit 70", tableNameWithSuffix); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); assertEquals(dynamicTable.getTableName(), tableName); @@ -273,7 +273,7 @@ public void testJsonMatch() String tableName = hybridTable.getTableName(); String tableNameWithSuffix = tableName + REALTIME_SUFFIX; String query = format("select origincityname from %s where json_match(origincityname, '\"$.name\"=''new york''') limit 70", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select \"OriginCityName\" from %s where json_match(\"OriginCityName\", '\"$.name\"=''new york''') limit 70", tableNameWithSuffix); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); assertEquals(dynamicTable.getTableName(), tableName); @@ -290,7 +290,7 @@ public void testSelectExpressionsWithAliases() " timeconvert(dayssinceEpoch, 'seconds', 'minutes') as foo" + " from %s limit 70", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select datetimeconvert(\"DaysSinceEpoch\", '1:SECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '15:MINUTES')," + " not_equals(CASE WHEN equals(\"OriginCityName\", 'nyc') THEN 'pizza' WHEN equals(\"OriginCityName\", 'la') THEN 'burrito' WHEN equals(\"OriginCityName\", 'boston') THEN 'clam chowder' ELSE 'burger' END, 'salad')," + " timeconvert(\"DaysSinceEpoch\", 'SECONDS', 'MINUTES') AS \"foo\" from %s limit 70", tableNameWithSuffix); @@ -311,7 +311,7 @@ public void testAggregateExpressionsWithAliases() " max(airtime) as baz" + " from %s group by 1, 3, 4 limit 70", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select datetimeconvert(\"DaysSinceEpoch\", '1:SECONDS:EPOCH'," + " '1:MILLISECONDS:EPOCH', '15:MINUTES'), count(*) AS \"bar\"," + " not_equals(CASE WHEN equals(\"OriginCityName\", 'nyc') THEN 'pizza' WHEN equals(\"OriginCityName\", 'la') THEN 'burrito'" + @@ -333,7 +333,7 @@ public void testOrderBy() String tableName = hybridTable.getTableName(); String tableNameWithSuffix = tableName + REALTIME_SUFFIX; String query = format("select ArrDelay + 34 - DaysSinceEpoch, FlightNum from %s order by ArrDelay asc, DaysSinceEpoch desc", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select plus(\"ArrDelay\", '34') - \"DaysSinceEpoch\", \"FlightNum\" from %s order by \"ArrDelay\", \"DaysSinceEpoch\" desc limit 10", tableNameWithSuffix); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); assertEquals(dynamicTable.getTableName(), tableName); @@ -345,7 +345,7 @@ public void testOrderByCountStar() String tableName = hybridTable.getTableName(); String tableNameWithSuffix = tableName + REALTIME_SUFFIX; String query = format("select count(*) from %s order by count(*)", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select count(*) from %s order by count(*) limit 10", tableNameWithSuffix); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); assertEquals(dynamicTable.getTableName(), tableName); @@ -357,7 +357,7 @@ public void testOrderByExpression() String tableName = hybridTable.getTableName(); String tableNameWithSuffix = tableName + REALTIME_SUFFIX; String query = format("select ArrDelay + 34 - DaysSinceEpoch, FlightNum from %s order by ArrDelay + 34 - DaysSinceEpoch desc", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select plus(\"ArrDelay\", '34') - \"DaysSinceEpoch\", \"FlightNum\" from %s order by plus(\"ArrDelay\", '34') - \"DaysSinceEpoch\" desc limit 10", tableNameWithSuffix); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); assertEquals(dynamicTable.getTableName(), tableName); @@ -369,7 +369,7 @@ public void testQuotesInAlias() String tableName = "quotes_in_column_names"; String tableNameWithSuffix = tableName + REALTIME_SUFFIX; String query = format("select non_quoted AS \"non\"\"quoted\" from %s limit 50", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select \"non_quoted\" AS \"non\"\"quoted\" from %s limit 50", tableNameWithSuffix); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); assertEquals(dynamicTable.getTableName(), tableName); @@ -381,7 +381,7 @@ public void testQuotesInColumnName() String tableName = "quotes_in_column_names"; String tableNameWithSuffix = tableName + REALTIME_SUFFIX; String query = format("select \"qu\"\"ot\"\"ed\" from %s limit 50", tableNameWithSuffix); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = format("select \"qu\"\"ot\"\"ed\" from %s limit 50", tableNameWithSuffix); assertEquals(extractPql(dynamicTable, TupleDomain.all(), ImmutableList.of()), expectedPql); assertEquals(dynamicTable.getTableName(), tableName); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotMetadata.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotMetadata.java index 96a48b486e79..7249cb5e6301 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotMetadata.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotMetadata.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableSet; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.TestingTypeManager; import org.testng.annotations.Test; import java.util.List; @@ -29,7 +30,7 @@ public class TestPinotMetadata { private final PinotConfig pinotConfig = new PinotConfig().setControllerUrls("localhost:9000"); - private final PinotMetadata metadata = new PinotMetadata(new MockPinotClient(pinotConfig), pinotConfig, Executors.newSingleThreadExecutor()); + private final PinotMetadata metadata = new PinotMetadata(new MockPinotClient(pinotConfig), pinotConfig, Executors.newSingleThreadExecutor(), new PinotTypeConverter(new TestingTypeManager())); @Test public void testTables() diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotQueryBase.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotQueryBase.java index 3c576f339fd8..ce09dd7c6e22 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotQueryBase.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotQueryBase.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.plugin.pinot.client.PinotClient; +import io.trino.spi.type.TestingTypeManager; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.Schema.SchemaBuilder; @@ -28,6 +29,8 @@ public class TestPinotQueryBase { + protected static final PinotTypeConverter TESTING_TYPE_CONVERTER = new PinotTypeConverter(new TestingTypeManager()); + protected static PinotTableHandle realtimeOnlyTable = new PinotTableHandle("schema", "realtimeOnly"); protected static PinotTableHandle hybridTable = new PinotTableHandle("schema", "hybrid"); @@ -37,7 +40,8 @@ public class TestPinotQueryBase protected final PinotMetadata pinotMetadata = new PinotMetadata( mockClusterInfoFetcher, pinotConfig, - newCachedThreadPool(threadsNamed("mock-pinot-metadata-fetcher"))); + newCachedThreadPool(threadsNamed("mock-pinot-metadata-fetcher")), + TESTING_TYPE_CONVERTER); protected List getColumnNames(String table) { diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotSplitManager.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotSplitManager.java index cb8821447370..d47f33eb8219 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotSplitManager.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotSplitManager.java @@ -51,7 +51,7 @@ public class TestPinotSplitManager public void testSplitsBroker() { SchemaTableName schemaTableName = new SchemaTableName("default", format("SELECT %s, %s FROM %s LIMIT %d", "AirlineID", "OriginStateName", "airlineStats", 100)); - DynamicTable dynamicTable = buildFromPql(pinotMetadata, schemaTableName, mockClusterInfoFetcher); + DynamicTable dynamicTable = buildFromPql(pinotMetadata, schemaTableName, mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); PinotTableHandle pinotTableHandle = new PinotTableHandle("default", dynamicTable.getTableName(), TupleDomain.all(), OptionalLong.empty(), Optional.of(dynamicTable)); List splits = getSplitsHelper(pinotTableHandle, 1, false); From 570e8c46611e65800dc22b2f0482aa0b1bc039b5 Mon Sep 17 00:00:00 2001 From: Elon Azoulay Date: Tue, 16 Aug 2022 19:19:55 -0700 Subject: [PATCH 2/2] Add JSON type support to the Pinot Connector Pinot does not yet support filtering on json type. --- docs/src/main/sphinx/connector/pinot.rst | 1 + .../io/trino/plugin/pinot/PinotMetadata.java | 10 ++++- .../plugin/pinot/PinotSegmentPageSource.java | 6 +++ .../plugin/pinot/PinotTypeConverter.java | 6 +++ .../plugin/pinot/decoders/DecoderFactory.java | 4 ++ .../plugin/pinot/decoders/JsonDecoder.java | 45 +++++++++++++++++++ .../AbstractPinotIntegrationSmokeTest.java | 45 +++++++++++++++++++ .../src/test/resources/json_offlineSpec.json | 31 +++++++++++++ .../src/test/resources/json_realtimeSpec.json | 44 ++++++++++++++++++ .../src/test/resources/json_schema.json | 24 ++++++++++ 10 files changed, 214 insertions(+), 2 deletions(-) create mode 100644 plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/JsonDecoder.java create mode 100644 plugin/trino-pinot/src/test/resources/json_offlineSpec.json create mode 100644 plugin/trino-pinot/src/test/resources/json_realtimeSpec.json create mode 100644 plugin/trino-pinot/src/test/resources/json_schema.json diff --git a/docs/src/main/sphinx/connector/pinot.rst b/docs/src/main/sphinx/connector/pinot.rst index eecac02b6b5c..92a39963dd2e 100644 --- a/docs/src/main/sphinx/connector/pinot.rst +++ b/docs/src/main/sphinx/connector/pinot.rst @@ -167,6 +167,7 @@ Pinot Trino ``DOUBLE`` ``DOUBLE`` ``STRING`` ``VARCHAR`` ``BYTES`` ``VARBINARY`` +``JSON`` ``JSON`` ``INT_ARRAY`` ``VARCHAR`` ``LONG_ARRAY`` ``VARCHAR`` ``FLOAT_ARRAY`` ``VARCHAR`` diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java index 34fc5e3cac01..7b59f7afea81 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java @@ -54,6 +54,7 @@ import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.ArrayType; +import io.trino.spi.type.Type; import org.apache.pinot.spi.data.Schema; import javax.inject.Inject; @@ -290,8 +291,13 @@ public Optional> applyFilter(C Map supported = new HashMap<>(); Map unsupported = new HashMap<>(); for (Map.Entry entry : domains.entrySet()) { - // Pinot does not support array literals - if (((PinotColumnHandle) entry.getKey()).getDataType() instanceof ArrayType) { + Type columnType = ((PinotColumnHandle) entry.getKey()).getDataType(); + if (columnType instanceof ArrayType) { + // Pinot does not support array literals + unsupported.put(entry.getKey(), entry.getValue()); + } + else if (typeConverter.isJsonType(columnType)) { + // Pinot does not support filtering on json values unsupported.put(entry.getKey(), entry.getValue()); } else { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java index fb74a017c221..7e35f909bc9e 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java @@ -23,6 +23,7 @@ import io.trino.spi.block.Block; import io.trino.spi.block.BlockBuilder; import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.type.StandardTypes; import io.trino.spi.type.Type; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; @@ -35,6 +36,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; +import static io.trino.plugin.base.util.JsonTypeUtil.jsonParse; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_DECODE_ERROR; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE; import static io.trino.plugin.pinot.decoders.VarbinaryDecoder.toBytes; @@ -328,6 +330,10 @@ private Slice getSlice(int rowIndex, int columnIndex) else if (trinoType instanceof VarbinaryType) { return Slices.wrappedBuffer(toBytes(currentDataTable.getDataTable().getString(rowIndex, columnIndex))); } + else if (trinoType.getTypeSignature().getBase() == StandardTypes.JSON) { + String field = currentDataTable.getDataTable().getString(rowIndex, columnIndex); + return jsonParse(getUtf8Slice(field)); + } return Slices.EMPTY_SLICE; } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTypeConverter.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTypeConverter.java index bfa7c853b11e..98fb687ef923 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTypeConverter.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTypeConverter.java @@ -131,4 +131,10 @@ public Type toTrinoType(DataSchema.ColumnDataType columnDataType) } throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported column data type: " + columnDataType); } + + public boolean isJsonType(Type type) + { + requireNonNull(type, "type is null"); + return type.equals(jsonTypeSupplier.get()); + } } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java index dbc957e69f6e..ffc72dd8d668 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java @@ -27,6 +27,7 @@ import java.util.Optional; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE; +import static io.trino.spi.type.StandardTypes.JSON; import static java.util.Objects.requireNonNull; public class DecoderFactory @@ -64,6 +65,9 @@ else if (type instanceof ArrayType) { else if (type instanceof VarbinaryType) { return new VarbinaryDecoder(); } + else if (type.getTypeSignature().getBase().equals(JSON)) { + return new JsonDecoder(); + } else { return new VarcharDecoder(); } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/JsonDecoder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/JsonDecoder.java new file mode 100644 index 000000000000..dcd0280be60c --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/JsonDecoder.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.decoders; + +import io.airlift.slice.Slice; +import io.trino.spi.TrinoException; +import io.trino.spi.block.BlockBuilder; + +import java.util.function.Supplier; + +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.base.util.JsonTypeUtil.jsonParse; +import static io.trino.spi.StandardErrorCode.TYPE_MISMATCH; +import static java.lang.String.format; + +public class JsonDecoder + implements Decoder +{ + @Override + public void decode(Supplier getter, BlockBuilder output) + { + Object value = getter.get(); + if (value == null) { + output.appendNull(); + } + else if (value instanceof String) { + Slice slice = jsonParse(utf8Slice((String) value)); + output.writeBytes(slice, 0, slice.length()).closeEntry(); + } + else { + throw new TrinoException(TYPE_MISMATCH, format("Expected a json value of type STRING: %s [%s]", value, value.getClass().getSimpleName())); + } + } +} diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java index f8bee993fff5..73c965d6622f 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java @@ -112,6 +112,7 @@ public abstract class AbstractPinotIntegrationSmokeTest private static final String DUPLICATE_TABLE_LOWERCASE = "dup_table"; private static final String DUPLICATE_TABLE_MIXED_CASE = "dup_Table"; private static final String JSON_TABLE = "my_table"; + private static final String JSON_TYPE_TABLE = "json_table"; private static final String RESERVED_KEYWORD_TABLE = "reserved_keyword"; private static final String QUOTES_IN_COLUMN_NAME_TABLE = "quotes_in_column_name"; private static final String DUPLICATE_VALUES_IN_COLUMNS_TABLE = "duplicate_values_in_columns"; @@ -310,6 +311,28 @@ protected QueryRunner createQueryRunner() pinot.createSchema(getClass().getClassLoader().getResourceAsStream("date_time_fields_schema.json"), DATE_TIME_FIELDS_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("date_time_fields_realtimeSpec.json"), DATE_TIME_FIELDS_TABLE); + // Create json type table + kafka.createTopic(JSON_TYPE_TABLE); + + Schema jsonTableAvroSchema = SchemaBuilder.record(JSON_TYPE_TABLE).fields() + .name("string_col").type().optional().stringType() + .name("json_col").type().optional().stringType() + .name("updatedAt").type().optional().longType() + .endRecord(); + + ImmutableList.Builder> jsonTableRecordsBuilder = ImmutableList.builder(); + for (int i = 0; i < 3; i++) { + jsonTableRecordsBuilder.add(new ProducerRecord<>(JSON_TYPE_TABLE, "key" + i, new GenericRecordBuilder(jsonTableAvroSchema) + .set("string_col", "string_" + i) + .set("json_col", "{ \"name\": \"user_" + i + "\", \"id\": " + i + "}") + .set("updatedAt", initialUpdatedAt.plusMillis(i * 1000).toEpochMilli()) + .build())); + } + kafka.sendMessages(jsonTableRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka)); + pinot.createSchema(getClass().getClassLoader().getResourceAsStream("json_schema.json"), JSON_TYPE_TABLE); + pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("json_realtimeSpec.json"), JSON_TYPE_TABLE); + pinot.addOfflineTable(getClass().getClassLoader().getResourceAsStream("json_offlineSpec.json"), JSON_TYPE_TABLE); + // Create json table kafka.createTopic(JSON_TABLE); long key = 0L; @@ -2203,4 +2226,26 @@ public void testVarbinary() assertThat(query("SELECT bytes_col FROM \"SELECT bytes_col, string_col FROM alltypes\" WHERE string_col != 'array_null'")) .matches(expectedValues); } + + @Test + public void testJson() + { + assertThat(query("SELECT json_col FROM " + JSON_TYPE_TABLE)) + .matches("VALUES (JSON '{\"id\":0,\"name\":\"user_0\"}')," + + " (JSON '{\"id\":1,\"name\":\"user_1\"}')," + + " (JSON '{\"id\":2,\"name\":\"user_2\"}')"); + assertThat(query("SELECT json_col" + + " FROM \"SELECT json_col FROM " + JSON_TYPE_TABLE + "\"")) + .matches("VALUES (JSON '{\"id\":0,\"name\":\"user_0\"}')," + + " (JSON '{\"id\":1,\"name\":\"user_1\"}')," + + " (JSON '{\"id\":2,\"name\":\"user_2\"}')"); + assertThat(query("SELECT name FROM \"SELECT json_extract_scalar(json_col, '$.name', 'STRING', '0') AS name" + + " FROM json_table WHERE json_extract_scalar(json_col, '$.id', 'INT', '0') = '1'\"")) + .matches("VALUES (VARCHAR 'user_1')"); + assertThat(query("SELECT JSON_EXTRACT_SCALAR(json_col, '$.name') FROM " + JSON_TYPE_TABLE + + " WHERE JSON_EXTRACT_SCALAR(json_col, '$.id') = '1'")) + .matches("VALUES (VARCHAR 'user_1')"); + assertThat(query("SELECT string_col FROM " + JSON_TYPE_TABLE + " WHERE json_col = JSON '{\"id\":0,\"name\":\"user_0\"}'")) + .matches("VALUES VARCHAR 'string_0'"); + } } diff --git a/plugin/trino-pinot/src/test/resources/json_offlineSpec.json b/plugin/trino-pinot/src/test/resources/json_offlineSpec.json new file mode 100644 index 000000000000..0f93e1e147b2 --- /dev/null +++ b/plugin/trino-pinot/src/test/resources/json_offlineSpec.json @@ -0,0 +1,31 @@ +{ + "tableName": "json_table", + "tableType": "OFFLINE", + "segmentsConfig": { + "timeColumnName": "updated_at_seconds", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "365", + "segmentPushType": "APPEND", + "segmentPushFrequency": "daily", + "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", + "replication": "1" + }, + "tenants": { + "broker": "DefaultTenant", + "server": "DefaultTenant" + }, + "tableIndexConfig": { + "loadMode": "MMAP", + "invertedIndexColumns": ["string_col"], + "sortedColumn": ["updated_at_seconds"], + "noDictionaryColumns": ["json_col"], + "starTreeIndexConfigs": [], + "aggregateMetrics": "true", + "nullHandlingEnabled": "true" + }, + "metadata": { + "customConfigs": { + "owner": "analytics@example.com" + } + } +} diff --git a/plugin/trino-pinot/src/test/resources/json_realtimeSpec.json b/plugin/trino-pinot/src/test/resources/json_realtimeSpec.json new file mode 100644 index 000000000000..d52d2decff3e --- /dev/null +++ b/plugin/trino-pinot/src/test/resources/json_realtimeSpec.json @@ -0,0 +1,44 @@ +{ + "tableName": "json_table", + "tableType": "REALTIME", + "segmentsConfig": { + "timeColumnName": "updated_at_seconds", + "timeType": "SECONDS", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "365", + "segmentPushType": "APPEND", + "segmentPushFrequency": "daily", + "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", + "schemaName": "json_table", + "replicasPerPartition": "1" + }, + "tenants": { + "broker": "DefaultTenant", + "server": "DefaultTenant" + }, + "tableIndexConfig": { + "loadMode": "MMAP", + "invertedIndexColumns": ["string_col"], + "sortedColumn": ["updated_at_seconds"], + "noDictionaryColumns": ["json_col"], + "streamConfigs": { + "streamType": "kafka", + "stream.kafka.consumer.type": "LowLevel", + "stream.kafka.topic.name": "json_table", + "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", + "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", + "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.zk.broker.url": "zookeeper:2181/", + "stream.kafka.broker.list": "kafka:9092", + "realtime.segment.flush.threshold.time": "1m", + "realtime.segment.flush.threshold.size": "0", + "realtime.segment.flush.desired.size": "1M", + "isolation.level": "read_committed", + "stream.kafka.consumer.prop.auto.offset.reset": "smallest", + "stream.kafka.consumer.prop.group.id": "json_table" + } + }, + "metadata": { + "customConfigs": {} + } +} diff --git a/plugin/trino-pinot/src/test/resources/json_schema.json b/plugin/trino-pinot/src/test/resources/json_schema.json new file mode 100644 index 000000000000..a6b1c20db3a4 --- /dev/null +++ b/plugin/trino-pinot/src/test/resources/json_schema.json @@ -0,0 +1,24 @@ +{ + "schemaName": "json_table", + "dimensionFieldSpecs": [ + { + "name": "string_col", + "dataType": "STRING" + }, + { + "name": "json_col", + "dataType": "JSON", + "maxLength": 2147483647 + } + ], + "dateTimeFieldSpecs": [ + { + "name": "updated_at_seconds", + "dataType": "LONG", + "defaultNullValue" : 0, + "format": "1:SECONDS:EPOCH", + "transformFunction": "toEpochSeconds(updatedAt)", + "granularity" : "1:SECONDS" + } + ] +}