diff --git a/docs/src/main/sphinx/connector/pinot.rst b/docs/src/main/sphinx/connector/pinot.rst
index 52c9ff5943d8..fcf3fa93e442 100644
--- a/docs/src/main/sphinx/connector/pinot.rst
+++ b/docs/src/main/sphinx/connector/pinot.rst
@@ -10,7 +10,7 @@ Requirements
To connect to Pinot, you need:
-* Pinot 0.1.0 or higher.
+* Pinot 0.8.0 or higher.
* Network access from the Trino coordinator and workers to the Pinot controller
nodes. Port 8098 is the default port.
diff --git a/plugin/trino-pinot/pom.xml b/plugin/trino-pinot/pom.xml
index a07a55e6d7d1..b28d6edd8806 100755
--- a/plugin/trino-pinot/pom.xml
+++ b/plugin/trino-pinot/pom.xml
@@ -14,7 +14,7 @@
${project.parent.basedir}
- 0.6.0
+ 0.8.0
@@ -103,12 +103,6 @@
guice
-
- com.yammer.metrics
- metrics-core
- 2.2.0
-
-
commons-codec
commons-codec
@@ -325,10 +319,18 @@
com.fasterxml.jackson.core
jackson-annotations
+
+ jakarta.ws.rs
+ jakarta.ws.rs-api
+
javax.validation
validation-api
+
+ org.glassfish.hk2.external
+ jakarta.inject
+
org.apache.lucene
lucene-analyzers-common
@@ -340,6 +342,28 @@
+
+ org.apache.pinot
+ pinot-segment-local
+ ${dep.pinot.version}
+
+
+ org.apache.lucene
+ lucene-analyzers-common
+
+
+ org.apache.lucene
+ lucene-core
+
+
+
+
+
+ org.apache.pinot
+ pinot-segment-spi
+ ${dep.pinot.version}
+
+
org.apache.pinot
pinot-spi
@@ -382,6 +406,13 @@
runtime
+
+ org.apache.pinot
+ pinot-yammer
+ ${dep.pinot.version}
+ runtime
+
+
io.trino
diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotBrokerPageSource.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotBrokerPageSource.java
index 5fc1e8f46b5b..e422bae25a21 100644
--- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotBrokerPageSource.java
+++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotBrokerPageSource.java
@@ -17,7 +17,7 @@
import io.trino.plugin.pinot.client.PinotClient.BrokerResultRow;
import io.trino.plugin.pinot.decoders.Decoder;
import io.trino.plugin.pinot.decoders.DecoderFactory;
-import io.trino.plugin.pinot.query.PinotQuery;
+import io.trino.plugin.pinot.query.PinotQueryInfo;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
@@ -39,7 +39,7 @@
public class PinotBrokerPageSource
implements ConnectorPageSource
{
- private final PinotQuery query;
+ private final PinotQueryInfo query;
private final PinotClient pinotClient;
private final ConnectorSession session;
private final List columnHandles;
@@ -56,7 +56,7 @@ public class PinotBrokerPageSource
public PinotBrokerPageSource(
ConnectorSession session,
- PinotQuery query,
+ PinotQueryInfo query,
List columnHandles,
PinotClient pinotClient,
int limitForBrokerQueries)
diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumn.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumn.java
deleted file mode 100755
index e63dcbe493ec..000000000000
--- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumn.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.pinot;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import io.trino.spi.type.ArrayType;
-import io.trino.spi.type.BigintType;
-import io.trino.spi.type.BooleanType;
-import io.trino.spi.type.DoubleType;
-import io.trino.spi.type.IntegerType;
-import io.trino.spi.type.RealType;
-import io.trino.spi.type.Type;
-import io.trino.spi.type.VarbinaryType;
-import io.trino.spi.type.VarcharType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static com.google.common.collect.ImmutableList.toImmutableList;
-import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
-import static java.util.Objects.requireNonNull;
-
-public class PinotColumn
-{
- private final String name;
- private final Type type;
-
- @JsonCreator
- public PinotColumn(
- @JsonProperty("name") String name,
- @JsonProperty("type") Type type)
- {
- checkArgument(!isNullOrEmpty(name), "name is null or is empty");
- this.name = name;
- this.type = requireNonNull(type, "type is null");
- }
-
- @JsonProperty
- public String getName()
- {
- return name;
- }
-
- @JsonProperty
- public Type getType()
- {
- return type;
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(name, type);
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
-
- PinotColumn other = (PinotColumn) obj;
- return Objects.equals(this.name, other.name) && Objects.equals(this.type, other.type);
- }
-
- @Override
- public String toString()
- {
- return name + ":" + type;
- }
-
- public static List getPinotColumnsForPinotSchema(Schema pinotTableSchema)
- {
- return pinotTableSchema.getColumnNames().stream()
- .filter(columnName -> !columnName.startsWith("$")) // Hidden columns starts with "$", ignore them as we can't use them in PQL
- .map(columnName -> new PinotColumn(columnName, getTrinoTypeFromPinotType(pinotTableSchema.getFieldSpecFor(columnName))))
- .collect(toImmutableList());
- }
-
- public static Type getTrinoTypeFromPinotType(FieldSpec field)
- {
- Type type = getTrinoTypeFromPinotType(field.getDataType());
- if (field.isSingleValueField()) {
- return type;
- }
- else {
- return new ArrayType(type);
- }
- }
-
- public static Type getTrinoTypeFromPinotType(DataType dataType)
- {
- switch (dataType) {
- case BOOLEAN:
- return BooleanType.BOOLEAN;
- case FLOAT:
- return RealType.REAL;
- case DOUBLE:
- return DoubleType.DOUBLE;
- case INT:
- return IntegerType.INTEGER;
- case LONG:
- return BigintType.BIGINT;
- case STRING:
- return VarcharType.VARCHAR;
- case BYTES:
- return VarbinaryType.VARBINARY;
- default:
- break;
- }
- throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported type conversion for pinot data type: " + dataType);
- }
-}
diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java
index ba7bb18a36ba..7b70ca43b3f8 100755
--- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java
+++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java
@@ -17,11 +17,28 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.RealType;
import io.trino.spi.type.Type;
+import io.trino.spi.type.VarbinaryType;
+import io.trino.spi.type.VarcharType;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
+import static io.trino.plugin.pinot.query.DynamicTablePqlExtractor.quoteIdentifier;
import static java.util.Objects.requireNonNull;
public class PinotColumnHandle
@@ -29,22 +46,97 @@ public class PinotColumnHandle
{
private final String columnName;
private final Type dataType;
+ private final String expression;
+ private final boolean aliased;
+ private final boolean aggregate;
private final boolean returnNullOnEmptyGroup;
+ private final Optional pushedDownAggregateFunctionName;
+ private final Optional pushedDownAggregateFunctionArgument;
public PinotColumnHandle(String columnName, Type dataType)
{
- this(columnName, dataType, true);
+ this(columnName, dataType, columnName, false, false, true, Optional.empty(), Optional.empty());
}
@JsonCreator
public PinotColumnHandle(
@JsonProperty("columnName") String columnName,
@JsonProperty("dataType") Type dataType,
- @JsonProperty("returnNullOnEmptyGroup") boolean returnNullOnEmptyGroup)
+ @JsonProperty("expression") String expression,
+ @JsonProperty("aliased") boolean aliased,
+ @JsonProperty("aggregate") boolean aggregate,
+ @JsonProperty("returnNullOnEmptyGroup") boolean returnNullOnEmptyGroup,
+ @JsonProperty("pushedDownAggregateFunctionName") Optional pushedDownAggregateFunctionName,
+ @JsonProperty("pushedDownAggregateFunctionArgument") Optional pushedDownAggregateFunctionArgument)
{
this.columnName = requireNonNull(columnName, "columnName is null");
this.dataType = requireNonNull(dataType, "dataType is null");
+ this.expression = requireNonNull(expression, "expression is null");
+ this.aliased = aliased;
+ this.aggregate = aggregate;
this.returnNullOnEmptyGroup = returnNullOnEmptyGroup;
+ requireNonNull(pushedDownAggregateFunctionName, "pushedDownaAggregateFunctionName is null");
+ requireNonNull(pushedDownAggregateFunctionArgument, "pushedDownaAggregateFunctionArgument is null");
+ checkState(pushedDownAggregateFunctionName.isPresent() == pushedDownAggregateFunctionArgument.isPresent(), "Unexpected arguments: Either pushedDownaAggregateFunctionName and pushedDownaAggregateFunctionArgument must both be present or both be empty.");
+ checkState((pushedDownAggregateFunctionName.isPresent() && aggregate) || pushedDownAggregateFunctionName.isEmpty(), "Unexpected arguments: aggregate is false but pushed down aggregation is present");
+ this.pushedDownAggregateFunctionName = pushedDownAggregateFunctionName;
+ this.pushedDownAggregateFunctionArgument = pushedDownAggregateFunctionArgument;
+ }
+
+ public static PinotColumnHandle fromNonAggregateColumnHandle(PinotColumnHandle columnHandle)
+ {
+ return new PinotColumnHandle(columnHandle.getColumnName(), columnHandle.getDataType(), quoteIdentifier(columnHandle.getColumnName()), false, false, true, Optional.empty(), Optional.empty());
+ }
+
+ public static List getPinotColumnsForPinotSchema(Schema pinotTableSchema)
+ {
+ return pinotTableSchema.getColumnNames().stream()
+ .filter(columnName -> !columnName.startsWith("$")) // Hidden columns starts with "$", ignore them as we can't use them in PQL
+ .map(columnName -> new PinotColumnHandle(columnName, getTrinoTypeFromPinotType(pinotTableSchema.getFieldSpecFor(columnName))))
+ .collect(toImmutableList());
+ }
+
+ public static Type getTrinoTypeFromPinotType(FieldSpec field)
+ {
+ Type type = getTrinoTypeFromPinotType(field.getDataType());
+ if (field.isSingleValueField()) {
+ return type;
+ }
+ else {
+ return new ArrayType(type);
+ }
+ }
+
+ public static Type getTrinoTypeFromPinotType(TransformResultMetadata transformResultMetadata)
+ {
+ Type type = getTrinoTypeFromPinotType(transformResultMetadata.getDataType());
+ if (transformResultMetadata.isSingleValue()) {
+ return type;
+ }
+ return new ArrayType(type);
+ }
+
+ public static Type getTrinoTypeFromPinotType(FieldSpec.DataType dataType)
+ {
+ switch (dataType) {
+ case BOOLEAN:
+ return BooleanType.BOOLEAN;
+ case FLOAT:
+ return RealType.REAL;
+ case DOUBLE:
+ return DoubleType.DOUBLE;
+ case INT:
+ return IntegerType.INTEGER;
+ case LONG:
+ return BigintType.BIGINT;
+ case STRING:
+ return VarcharType.VARCHAR;
+ case BYTES:
+ return VarbinaryType.VARBINARY;
+ default:
+ break;
+ }
+ throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported type conversion for pinot data type: " + dataType);
}
@JsonProperty
@@ -59,9 +151,26 @@ public Type getDataType()
return dataType;
}
- public ColumnMetadata getColumnMetadata()
+ @JsonProperty
+ public String getExpression()
{
- return new ColumnMetadata(getColumnName(), getDataType());
+ return expression;
+ }
+
+ // Keep track of whether this column is aliased, it will determine how the pinot sql query is built
+ // The reason is that pinot parses the broker request into pinot pql but expects pinot sql.
+ // In some cases the parsed pql expression is an invalid sql expression.
+ @JsonProperty
+ public boolean isAliased()
+ {
+ return aliased;
+ }
+
+ // True if this is an aggregate column for both passthrough query and pushed down aggregate expressions.
+ @JsonProperty
+ public boolean isAggregate()
+ {
+ return aggregate;
}
// Some aggregations should return null on empty group, ex. min/max
@@ -72,6 +181,32 @@ public boolean isReturnNullOnEmptyGroup()
return returnNullOnEmptyGroup;
}
+ // If the aggregate expression is pushed down store the function name
+ // If the argument is an alias the pinot expression will use the original
+ // column name in the expression and alias it.
+ //
+ // Example: SELECT MAX(bar) FROM "SELECT foo AS bar FROM table"
+ // Will translate to the pinot query "SELECT MAX(foo) AS \"max(bar)\""
+ //
+ // Note: Pinot omits quotes on the autogenerated column name "max(bar)"
+ @JsonProperty
+ public Optional getPushedDownAggregateFunctionName()
+ {
+ return pushedDownAggregateFunctionName;
+ }
+
+ // See comment for getPushedDownaAggregateFunctionName()
+ @JsonProperty
+ public Optional getPushedDownAggregateFunctionArgument()
+ {
+ return pushedDownAggregateFunctionArgument;
+ }
+
+ public ColumnMetadata getColumnMetadata()
+ {
+ return new ColumnMetadata(getColumnName(), getDataType());
+ }
+
@Override
public boolean equals(Object o)
{
@@ -99,7 +234,12 @@ public String toString()
return toStringHelper(this)
.add("columnName", columnName)
.add("dataType", dataType)
+ .add("expression", expression)
+ .add("aliased", aliased)
+ .add("aggregate", aggregate)
.add("returnNullOnEmptyGroup", returnNullOnEmptyGroup)
+ .add("pushedDownaAggregateFunctionName", pushedDownAggregateFunctionName)
+ .add("pushedDownaAggregateFunctionArgument", pushedDownAggregateFunctionArgument)
.toString();
}
}
diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotInsufficientServerResponseException.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotInsufficientServerResponseException.java
index 3a2888d8b1c5..0823fef68fd6 100644
--- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotInsufficientServerResponseException.java
+++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotInsufficientServerResponseException.java
@@ -13,7 +13,7 @@
*/
package io.trino.plugin.pinot;
-import io.trino.plugin.pinot.query.PinotQuery;
+import io.trino.plugin.pinot.query.PinotQueryInfo;
import java.util.Optional;
@@ -23,12 +23,12 @@
public class PinotInsufficientServerResponseException
extends PinotException
{
- public PinotInsufficientServerResponseException(PinotQuery query, int numberOfServersResponded, int numberOfServersQueried)
+ public PinotInsufficientServerResponseException(PinotQueryInfo query, int numberOfServersResponded, int numberOfServersQueried)
{
this(query, format("Only %s out of %s servers responded for query %s", numberOfServersResponded, numberOfServersQueried, query.getQuery()));
}
- public PinotInsufficientServerResponseException(PinotQuery query, String message)
+ public PinotInsufficientServerResponseException(PinotQueryInfo query, String message)
{
super(PINOT_INSUFFICIENT_SERVER_RESPONSE, Optional.of(query.getQuery()), message, true);
}
diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
index cac8641167a0..3e92b54bfb4c 100755
--- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
+++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
@@ -24,6 +24,7 @@
import io.trino.plugin.base.expression.AggregateFunctionRewriter;
import io.trino.plugin.base.expression.AggregateFunctionRule;
import io.trino.plugin.pinot.client.PinotClient;
+import io.trino.plugin.pinot.query.AggregateExpression;
import io.trino.plugin.pinot.query.DynamicTable;
import io.trino.plugin.pinot.query.DynamicTableBuilder;
import io.trino.plugin.pinot.query.aggregation.ImplementApproxDistinct;
@@ -37,7 +38,6 @@
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
-import io.trino.spi.connector.ColumnNotFoundException;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
@@ -72,9 +72,11 @@
import static com.google.common.base.Verify.verify;
import static com.google.common.cache.CacheLoader.asyncReloading;
import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.getOnlyElement;
-import static io.trino.plugin.pinot.PinotColumn.getPinotColumnsForPinotSchema;
+import static io.trino.plugin.pinot.PinotColumnHandle.getPinotColumnsForPinotSchema;
import static io.trino.plugin.pinot.PinotSessionProperties.isAggregationPushdownEnabled;
+import static io.trino.plugin.pinot.query.AggregateExpression.replaceIdentifier;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.function.UnaryOperator.identity;
@@ -88,11 +90,12 @@ public class PinotMetadata
private static final String SCHEMA_NAME = "default";
private static final String PINOT_COLUMN_NAME_PROPERTY = "pinotColumnName";
- private final LoadingCache> pinotTableColumnCache;
+ private final LoadingCache> pinotTableColumnCache;
private final LoadingCache