Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ Pinot Trino
``DOUBLE`` ``DOUBLE``
``STRING`` ``VARCHAR``
``BYTES`` ``VARBINARY``
``JSON`` ``JSON``
``INT_ARRAY`` ``VARCHAR``
``LONG_ARRAY`` ``VARCHAR``
``FLOAT_ARRAY`` ``VARCHAR``
Expand Down
7 changes: 7 additions & 0 deletions plugin/trino-pinot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,13 @@
import com.google.common.collect.ImmutableMap;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.spi.data.FieldSpec;

import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static io.trino.plugin.pinot.PinotMetadata.PINOT_COLUMN_NAME_PROPERTY;
import static io.trino.plugin.pinot.query.DynamicTablePqlExtractor.quoteIdentifier;
import static java.lang.String.format;
Expand Down Expand Up @@ -94,49 +83,6 @@ public static PinotColumnHandle fromColumnMetadata(ColumnMetadata columnMetadata
return new PinotColumnHandle(columnName, columnMetadata.getType());
}

public static Type getTrinoTypeFromPinotType(FieldSpec field)
{
Type type = getTrinoTypeFromPinotType(field.getDataType());
if (field.isSingleValueField()) {
return type;
}
else {
return new ArrayType(type);
}
}

public static Type getTrinoTypeFromPinotType(TransformResultMetadata transformResultMetadata)
{
Type type = getTrinoTypeFromPinotType(transformResultMetadata.getDataType());
if (transformResultMetadata.isSingleValue()) {
return type;
}
return new ArrayType(type);
}

public static Type getTrinoTypeFromPinotType(FieldSpec.DataType dataType)
{
switch (dataType) {
case BOOLEAN:
return BooleanType.BOOLEAN;
case FLOAT:
return RealType.REAL;
case DOUBLE:
return DoubleType.DOUBLE;
case INT:
return IntegerType.INTEGER;
case LONG:
return BigintType.BIGINT;
case STRING:
return VarcharType.VARCHAR;
case BYTES:
return VarbinaryType.VARBINARY;
default:
break;
}
throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported type conversion for pinot data type: " + dataType);
}

@JsonProperty
public String getColumnName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.Type;
import org.apache.pinot.spi.data.Schema;

import javax.inject.Inject;
Expand All @@ -76,7 +77,6 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.plugin.pinot.PinotColumnHandle.fromColumnMetadata;
import static io.trino.plugin.pinot.PinotColumnHandle.getTrinoTypeFromPinotType;
import static io.trino.plugin.pinot.PinotSessionProperties.isAggregationPushdownEnabled;
import static io.trino.plugin.pinot.query.AggregateExpression.replaceIdentifier;
import static java.util.Locale.ENGLISH;
Expand All @@ -94,16 +94,19 @@ public class PinotMetadata
private final AggregateFunctionRewriter<AggregateExpression, Void> aggregateFunctionRewriter;
private final ImplementCountDistinct implementCountDistinct;
private final PinotClient pinotClient;
private final PinotTypeConverter typeConverter;

@Inject
public PinotMetadata(
PinotClient pinotClient,
PinotConfig pinotConfig,
@ForPinot ExecutorService executor)
@ForPinot ExecutorService executor,
PinotTypeConverter typeConverter)
{
requireNonNull(pinotConfig, "pinot config");
this.pinotClient = requireNonNull(pinotClient, "pinotClient is null");
long metadataCacheExpiryMillis = pinotConfig.getMetadataCacheExpiry().roundTo(TimeUnit.MILLISECONDS);
this.typeConverter = requireNonNull(typeConverter, "typeConverter is null");
this.pinotTableColumnCache = buildNonEvictableCache(
CacheBuilder.newBuilder()
.refreshAfterWrite(metadataCacheExpiryMillis, TimeUnit.MILLISECONDS),
Expand Down Expand Up @@ -143,7 +146,7 @@ public List<String> listSchemaNames(ConnectorSession session)
public PinotTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
if (tableName.getTableName().trim().startsWith("select ")) {
DynamicTable dynamicTable = DynamicTableBuilder.buildFromPql(this, tableName, pinotClient);
DynamicTable dynamicTable = DynamicTableBuilder.buildFromPql(this, tableName, pinotClient, typeConverter);
return new PinotTableHandle(tableName.getSchemaName(), dynamicTable.getTableName(), TupleDomain.all(), OptionalLong.empty(), Optional.of(dynamicTable));
}
String pinotTableName = pinotClient.getPinotTableNameFromTrinoTableNameIfExists(tableName.getTableName());
Expand Down Expand Up @@ -288,8 +291,13 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
Map<ColumnHandle, Domain> supported = new HashMap<>();
Map<ColumnHandle, Domain> unsupported = new HashMap<>();
for (Map.Entry<ColumnHandle, Domain> entry : domains.entrySet()) {
// Pinot does not support array literals
if (((PinotColumnHandle) entry.getKey()).getDataType() instanceof ArrayType) {
Type columnType = ((PinotColumnHandle) entry.getKey()).getDataType();
if (columnType instanceof ArrayType) {
// Pinot does not support array literals
unsupported.put(entry.getKey(), entry.getValue());
}
else if (typeConverter.isJsonType(columnType)) {
// Pinot does not support filtering on json values
unsupported.put(entry.getKey(), entry.getValue());
}
else {
Expand Down Expand Up @@ -517,7 +525,7 @@ private List<ColumnMetadata> getPinotColumnMetadataForPinotSchema(Schema pinotTa
.filter(columnName -> !columnName.startsWith("$")) // Hidden columns starts with "$", ignore them as we can't use them in PQL
.map(columnName -> ColumnMetadata.builder()
.setName(columnName)
.setType(getTrinoTypeFromPinotType(pinotTableSchema.getFieldSpecFor(columnName)))
.setType(typeConverter.toTrinoType(pinotTableSchema.getFieldSpecFor(columnName)))
.setProperties(ImmutableMap.<String, Object>builder()
.put(PINOT_COLUMN_NAME_PROPERTY, columnName)
.buildOrThrow())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void setup(Binder binder)
binder.bind(PinotSplitManager.class).in(Scopes.SINGLETON);
binder.bind(PinotPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(PinotClient.class).in(Scopes.SINGLETON);
binder.bind(PinotTypeConverter.class).in(Scopes.SINGLETON);
binder.bind(ExecutorService.class).annotatedWith(ForPinot.class)
.toInstance(newCachedThreadPool(threadsNamed("pinot-metadata-fetcher-" + catalogName)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
Expand All @@ -35,6 +36,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static io.trino.plugin.base.util.JsonTypeUtil.jsonParse;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_DECODE_ERROR;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static io.trino.plugin.pinot.decoders.VarbinaryDecoder.toBytes;
Expand Down Expand Up @@ -328,6 +330,10 @@ private Slice getSlice(int rowIndex, int columnIndex)
else if (trinoType instanceof VarbinaryType) {
return Slices.wrappedBuffer(toBytes(currentDataTable.getDataTable().getString(rowIndex, columnIndex)));
}
else if (trinoType.getTypeSignature().getBase() == StandardTypes.JSON) {
String field = currentDataTable.getDataTable().getString(rowIndex, columnIndex);
return jsonParse(getUtf8Slice(field));
}
return Slices.EMPTY_SLICE;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot;

import com.google.common.base.Suppliers;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.spi.data.FieldSpec;

import javax.inject.Inject;

import java.util.Optional;
import java.util.function.Supplier;

import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.StandardTypes.JSON;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class PinotTypeConverter
{
// Supplier is used for compatibility unit tests using TestingTypeManager.
// TestingTypeManager does not support json type.
private final Supplier<Type> jsonTypeSupplier;

@Inject
public PinotTypeConverter(TypeManager typeManager)
{
requireNonNull(typeManager, "typeManager is null");
this.jsonTypeSupplier = Suppliers.memoize(() -> typeManager.getType(new TypeSignature(JSON)));
}

public Type toTrinoType(FieldSpec field)
{
return toTrinoType(field.getDataType(), field.isSingleValueField());
}

public Type toTrinoType(TransformResultMetadata transformResultMetadata)
{
return toTrinoType(transformResultMetadata.getDataType(), transformResultMetadata.isSingleValue());
}

private Type toTrinoType(FieldSpec.DataType dataType, boolean isSingleValue)
{
Type type = toTrinoType(dataType);
if (isSingleValue) {
return type;
}
return new ArrayType(type);
}

private Type toTrinoType(FieldSpec.DataType dataType)
{
switch (dataType) {
case BOOLEAN:
return BooleanType.BOOLEAN;
case FLOAT:
return RealType.REAL;
case DOUBLE:
return DoubleType.DOUBLE;
case INT:
return IntegerType.INTEGER;
case LONG:
return BigintType.BIGINT;
case STRING:
return VarcharType.VARCHAR;
case JSON:
return jsonTypeSupplier.get();
case BYTES:
return VarbinaryType.VARBINARY;
default:
break;
}
throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported type conversion for pinot data type: " + dataType);
}

public Type toTrinoType(DataSchema.ColumnDataType columnDataType)
{
switch (columnDataType) {
case INT:
return INTEGER;
case LONG:
return BIGINT;
case FLOAT:
return REAL;
case DOUBLE:
return DOUBLE;
case STRING:
return VARCHAR;
case JSON:
return jsonTypeSupplier.get();
case BYTES:
return VARBINARY;
case INT_ARRAY:
return new ArrayType(INTEGER);
case LONG_ARRAY:
return new ArrayType(BIGINT);
case DOUBLE_ARRAY:
return new ArrayType(DOUBLE);
case STRING_ARRAY:
return new ArrayType(VARCHAR);
default:
break;
}
throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported column data type: " + columnDataType);
}

public boolean isJsonType(Type type)
{
requireNonNull(type, "type is null");
return type.equals(jsonTypeSupplier.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;

import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static io.trino.spi.type.StandardTypes.JSON;
import static java.util.Objects.requireNonNull;

public class DecoderFactory
Expand Down Expand Up @@ -64,6 +65,9 @@ else if (type instanceof ArrayType) {
else if (type instanceof VarbinaryType) {
return new VarbinaryDecoder();
}
else if (type.getTypeSignature().getBase().equals(JSON)) {
return new JsonDecoder();
}
else {
return new VarcharDecoder();
}
Expand Down
Loading