diff --git a/docs/src/main/sphinx/connector/pinot.rst b/docs/src/main/sphinx/connector/pinot.rst
index eecac02b6b5c..92a39963dd2e 100644
--- a/docs/src/main/sphinx/connector/pinot.rst
+++ b/docs/src/main/sphinx/connector/pinot.rst
@@ -167,6 +167,7 @@ Pinot Trino
``DOUBLE`` ``DOUBLE``
``STRING`` ``VARCHAR``
``BYTES`` ``VARBINARY``
+``JSON`` ``JSON``
``INT_ARRAY`` ``VARCHAR``
``LONG_ARRAY`` ``VARCHAR``
``FLOAT_ARRAY`` ``VARCHAR``
diff --git a/plugin/trino-pinot/pom.xml b/plugin/trino-pinot/pom.xml
index f5a190ebe53a..145b4b00b43b 100755
--- a/plugin/trino-pinot/pom.xml
+++ b/plugin/trino-pinot/pom.xml
@@ -521,6 +521,13 @@
test
+
+ io.trino
+ trino-spi
+ test-jar
+ test
+
+
io.trino
trino-testing
diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java
index 3d0b0c406b4d..28e98b631553 100755
--- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java
+++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java
@@ -18,24 +18,13 @@
import com.google.common.collect.ImmutableMap;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
-import io.trino.spi.type.ArrayType;
-import io.trino.spi.type.BigintType;
-import io.trino.spi.type.BooleanType;
-import io.trino.spi.type.DoubleType;
-import io.trino.spi.type.IntegerType;
-import io.trino.spi.type.RealType;
import io.trino.spi.type.Type;
-import io.trino.spi.type.VarbinaryType;
-import io.trino.spi.type.VarcharType;
-import org.apache.pinot.core.operator.transform.TransformResultMetadata;
-import org.apache.pinot.spi.data.FieldSpec;
import java.util.Objects;
import java.util.Optional;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
-import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static io.trino.plugin.pinot.PinotMetadata.PINOT_COLUMN_NAME_PROPERTY;
import static io.trino.plugin.pinot.query.DynamicTablePqlExtractor.quoteIdentifier;
import static java.lang.String.format;
@@ -94,49 +83,6 @@ public static PinotColumnHandle fromColumnMetadata(ColumnMetadata columnMetadata
return new PinotColumnHandle(columnName, columnMetadata.getType());
}
- public static Type getTrinoTypeFromPinotType(FieldSpec field)
- {
- Type type = getTrinoTypeFromPinotType(field.getDataType());
- if (field.isSingleValueField()) {
- return type;
- }
- else {
- return new ArrayType(type);
- }
- }
-
- public static Type getTrinoTypeFromPinotType(TransformResultMetadata transformResultMetadata)
- {
- Type type = getTrinoTypeFromPinotType(transformResultMetadata.getDataType());
- if (transformResultMetadata.isSingleValue()) {
- return type;
- }
- return new ArrayType(type);
- }
-
- public static Type getTrinoTypeFromPinotType(FieldSpec.DataType dataType)
- {
- switch (dataType) {
- case BOOLEAN:
- return BooleanType.BOOLEAN;
- case FLOAT:
- return RealType.REAL;
- case DOUBLE:
- return DoubleType.DOUBLE;
- case INT:
- return IntegerType.INTEGER;
- case LONG:
- return BigintType.BIGINT;
- case STRING:
- return VarcharType.VARCHAR;
- case BYTES:
- return VarbinaryType.VARBINARY;
- default:
- break;
- }
- throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported type conversion for pinot data type: " + dataType);
- }
-
@JsonProperty
public String getColumnName()
{
diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
index ac567dd7052b..7b59f7afea81 100755
--- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
+++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
@@ -54,6 +54,7 @@
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.Type;
import org.apache.pinot.spi.data.Schema;
import javax.inject.Inject;
@@ -76,7 +77,6 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.plugin.pinot.PinotColumnHandle.fromColumnMetadata;
-import static io.trino.plugin.pinot.PinotColumnHandle.getTrinoTypeFromPinotType;
import static io.trino.plugin.pinot.PinotSessionProperties.isAggregationPushdownEnabled;
import static io.trino.plugin.pinot.query.AggregateExpression.replaceIdentifier;
import static java.util.Locale.ENGLISH;
@@ -94,16 +94,19 @@ public class PinotMetadata
private final AggregateFunctionRewriter aggregateFunctionRewriter;
private final ImplementCountDistinct implementCountDistinct;
private final PinotClient pinotClient;
+ private final PinotTypeConverter typeConverter;
@Inject
public PinotMetadata(
PinotClient pinotClient,
PinotConfig pinotConfig,
- @ForPinot ExecutorService executor)
+ @ForPinot ExecutorService executor,
+ PinotTypeConverter typeConverter)
{
requireNonNull(pinotConfig, "pinot config");
this.pinotClient = requireNonNull(pinotClient, "pinotClient is null");
long metadataCacheExpiryMillis = pinotConfig.getMetadataCacheExpiry().roundTo(TimeUnit.MILLISECONDS);
+ this.typeConverter = requireNonNull(typeConverter, "typeConverter is null");
this.pinotTableColumnCache = buildNonEvictableCache(
CacheBuilder.newBuilder()
.refreshAfterWrite(metadataCacheExpiryMillis, TimeUnit.MILLISECONDS),
@@ -143,7 +146,7 @@ public List listSchemaNames(ConnectorSession session)
public PinotTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
if (tableName.getTableName().trim().startsWith("select ")) {
- DynamicTable dynamicTable = DynamicTableBuilder.buildFromPql(this, tableName, pinotClient);
+ DynamicTable dynamicTable = DynamicTableBuilder.buildFromPql(this, tableName, pinotClient, typeConverter);
return new PinotTableHandle(tableName.getSchemaName(), dynamicTable.getTableName(), TupleDomain.all(), OptionalLong.empty(), Optional.of(dynamicTable));
}
String pinotTableName = pinotClient.getPinotTableNameFromTrinoTableNameIfExists(tableName.getTableName());
@@ -288,8 +291,13 @@ public Optional> applyFilter(C
Map supported = new HashMap<>();
Map unsupported = new HashMap<>();
for (Map.Entry entry : domains.entrySet()) {
- // Pinot does not support array literals
- if (((PinotColumnHandle) entry.getKey()).getDataType() instanceof ArrayType) {
+ Type columnType = ((PinotColumnHandle) entry.getKey()).getDataType();
+ if (columnType instanceof ArrayType) {
+ // Pinot does not support array literals
+ unsupported.put(entry.getKey(), entry.getValue());
+ }
+ else if (typeConverter.isJsonType(columnType)) {
+ // Pinot does not support filtering on json values
unsupported.put(entry.getKey(), entry.getValue());
}
else {
@@ -517,7 +525,7 @@ private List getPinotColumnMetadataForPinotSchema(Schema pinotTa
.filter(columnName -> !columnName.startsWith("$")) // Hidden columns starts with "$", ignore them as we can't use them in PQL
.map(columnName -> ColumnMetadata.builder()
.setName(columnName)
- .setType(getTrinoTypeFromPinotType(pinotTableSchema.getFieldSpecFor(columnName)))
+ .setType(typeConverter.toTrinoType(pinotTableSchema.getFieldSpecFor(columnName)))
.setProperties(ImmutableMap.builder()
.put(PINOT_COLUMN_NAME_PROPERTY, columnName)
.buildOrThrow())
diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java
index e7709f8ba119..096c1494b405 100755
--- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java
+++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java
@@ -77,6 +77,7 @@ public void setup(Binder binder)
binder.bind(PinotSplitManager.class).in(Scopes.SINGLETON);
binder.bind(PinotPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(PinotClient.class).in(Scopes.SINGLETON);
+ binder.bind(PinotTypeConverter.class).in(Scopes.SINGLETON);
binder.bind(ExecutorService.class).annotatedWith(ForPinot.class)
.toInstance(newCachedThreadPool(threadsNamed("pinot-metadata-fetcher-" + catalogName)));
diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java
index fb74a017c221..7e35f909bc9e 100755
--- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java
+++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java
@@ -23,6 +23,7 @@
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ConnectorPageSource;
+import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
@@ -35,6 +36,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static io.trino.plugin.base.util.JsonTypeUtil.jsonParse;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_DECODE_ERROR;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static io.trino.plugin.pinot.decoders.VarbinaryDecoder.toBytes;
@@ -328,6 +330,10 @@ private Slice getSlice(int rowIndex, int columnIndex)
else if (trinoType instanceof VarbinaryType) {
return Slices.wrappedBuffer(toBytes(currentDataTable.getDataTable().getString(rowIndex, columnIndex)));
}
+ else if (trinoType.getTypeSignature().getBase() == StandardTypes.JSON) {
+ String field = currentDataTable.getDataTable().getString(rowIndex, columnIndex);
+ return jsonParse(getUtf8Slice(field));
+ }
return Slices.EMPTY_SLICE;
}
diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTypeConverter.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTypeConverter.java
new file mode 100644
index 000000000000..98fb687ef923
--- /dev/null
+++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTypeConverter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.pinot;
+
+import com.google.common.base.Suppliers;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.RealType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeManager;
+import io.trino.spi.type.TypeSignature;
+import io.trino.spi.type.VarbinaryType;
+import io.trino.spi.type.VarcharType;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+
+import javax.inject.Inject;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.StandardTypes.JSON;
+import static io.trino.spi.type.VarbinaryType.VARBINARY;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.util.Objects.requireNonNull;
+
+public class PinotTypeConverter
+{
+ // Supplier is used for compatibility unit tests using TestingTypeManager.
+ // TestingTypeManager does not support json type.
+ private final Supplier jsonTypeSupplier;
+
+ @Inject
+ public PinotTypeConverter(TypeManager typeManager)
+ {
+ requireNonNull(typeManager, "typeManager is null");
+ this.jsonTypeSupplier = Suppliers.memoize(() -> typeManager.getType(new TypeSignature(JSON)));
+ }
+
+ public Type toTrinoType(FieldSpec field)
+ {
+ return toTrinoType(field.getDataType(), field.isSingleValueField());
+ }
+
+ public Type toTrinoType(TransformResultMetadata transformResultMetadata)
+ {
+ return toTrinoType(transformResultMetadata.getDataType(), transformResultMetadata.isSingleValue());
+ }
+
+ private Type toTrinoType(FieldSpec.DataType dataType, boolean isSingleValue)
+ {
+ Type type = toTrinoType(dataType);
+ if (isSingleValue) {
+ return type;
+ }
+ return new ArrayType(type);
+ }
+
+ private Type toTrinoType(FieldSpec.DataType dataType)
+ {
+ switch (dataType) {
+ case BOOLEAN:
+ return BooleanType.BOOLEAN;
+ case FLOAT:
+ return RealType.REAL;
+ case DOUBLE:
+ return DoubleType.DOUBLE;
+ case INT:
+ return IntegerType.INTEGER;
+ case LONG:
+ return BigintType.BIGINT;
+ case STRING:
+ return VarcharType.VARCHAR;
+ case JSON:
+ return jsonTypeSupplier.get();
+ case BYTES:
+ return VarbinaryType.VARBINARY;
+ default:
+ break;
+ }
+ throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported type conversion for pinot data type: " + dataType);
+ }
+
+ public Type toTrinoType(DataSchema.ColumnDataType columnDataType)
+ {
+ switch (columnDataType) {
+ case INT:
+ return INTEGER;
+ case LONG:
+ return BIGINT;
+ case FLOAT:
+ return REAL;
+ case DOUBLE:
+ return DOUBLE;
+ case STRING:
+ return VARCHAR;
+ case JSON:
+ return jsonTypeSupplier.get();
+ case BYTES:
+ return VARBINARY;
+ case INT_ARRAY:
+ return new ArrayType(INTEGER);
+ case LONG_ARRAY:
+ return new ArrayType(BIGINT);
+ case DOUBLE_ARRAY:
+ return new ArrayType(DOUBLE);
+ case STRING_ARRAY:
+ return new ArrayType(VARCHAR);
+ default:
+ break;
+ }
+ throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported column data type: " + columnDataType);
+ }
+
+ public boolean isJsonType(Type type)
+ {
+ requireNonNull(type, "type is null");
+ return type.equals(jsonTypeSupplier.get());
+ }
+}
diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java
index dbc957e69f6e..ffc72dd8d668 100644
--- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java
+++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java
@@ -27,6 +27,7 @@
import java.util.Optional;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
+import static io.trino.spi.type.StandardTypes.JSON;
import static java.util.Objects.requireNonNull;
public class DecoderFactory
@@ -64,6 +65,9 @@ else if (type instanceof ArrayType) {
else if (type instanceof VarbinaryType) {
return new VarbinaryDecoder();
}
+ else if (type.getTypeSignature().getBase().equals(JSON)) {
+ return new JsonDecoder();
+ }
else {
return new VarcharDecoder();
}
diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/JsonDecoder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/JsonDecoder.java
new file mode 100644
index 000000000000..dcd0280be60c
--- /dev/null
+++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/JsonDecoder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.pinot.decoders;
+
+import io.airlift.slice.Slice;
+import io.trino.spi.TrinoException;
+import io.trino.spi.block.BlockBuilder;
+
+import java.util.function.Supplier;
+
+import static io.airlift.slice.Slices.utf8Slice;
+import static io.trino.plugin.base.util.JsonTypeUtil.jsonParse;
+import static io.trino.spi.StandardErrorCode.TYPE_MISMATCH;
+import static java.lang.String.format;
+
+public class JsonDecoder
+ implements Decoder
+{
+ @Override
+ public void decode(Supplier