Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.type.ArrayType;
Expand All @@ -28,17 +29,16 @@
import io.trino.spi.type.VarcharType;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static io.trino.plugin.pinot.PinotMetadata.PINOT_COLUMN_NAME_PROPERTY;
import static io.trino.plugin.pinot.query.DynamicTablePqlExtractor.quoteIdentifier;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class PinotColumnHandle
Expand Down Expand Up @@ -88,12 +88,10 @@ public static PinotColumnHandle fromNonAggregateColumnHandle(PinotColumnHandle c
return new PinotColumnHandle(columnHandle.getColumnName(), columnHandle.getDataType(), quoteIdentifier(columnHandle.getColumnName()), false, false, true, Optional.empty(), Optional.empty());
}

public static List<PinotColumnHandle> getPinotColumnsForPinotSchema(Schema pinotTableSchema)
public static PinotColumnHandle fromColumnMetadata(ColumnMetadata columnMetadata)
{
return pinotTableSchema.getColumnNames().stream()
.filter(columnName -> !columnName.startsWith("$")) // Hidden columns starts with "$", ignore them as we can't use them in PQL
.map(columnName -> new PinotColumnHandle(columnName, getTrinoTypeFromPinotType(pinotTableSchema.getFieldSpecFor(columnName))))
.collect(toImmutableList());
String columnName = (String) requireNonNull(columnMetadata.getProperties().get(PINOT_COLUMN_NAME_PROPERTY), format("Missing required column property '%s'", PINOT_COLUMN_NAME_PROPERTY));
return new PinotColumnHandle(columnName, columnMetadata.getType());
}

public static Type getTrinoTypeFromPinotType(FieldSpec field)
Expand Down Expand Up @@ -204,7 +202,13 @@ public Optional<String> getPushedDownAggregateFunctionArgument()

public ColumnMetadata getColumnMetadata()
{
return new ColumnMetadata(getColumnName(), getDataType());
return ColumnMetadata.builder()
.setName(columnName)
.setType(dataType)
.setProperties(ImmutableMap.<String, Object>builder()
.put(PINOT_COLUMN_NAME_PROPERTY, columnName)
.buildOrThrow())
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.plugin.pinot.PinotColumnHandle.getPinotColumnsForPinotSchema;
import static io.trino.plugin.pinot.PinotColumnHandle.fromColumnMetadata;
import static io.trino.plugin.pinot.PinotColumnHandle.getTrinoTypeFromPinotType;
import static io.trino.plugin.pinot.PinotSessionProperties.isAggregationPushdownEnabled;
import static io.trino.plugin.pinot.query.AggregateExpression.replaceIdentifier;
import static java.util.Locale.ENGLISH;
Expand All @@ -87,13 +88,14 @@
public class PinotMetadata
implements ConnectorMetadata
{
public static final String PINOT_COLUMN_NAME_PROPERTY = "pinotColumnName";

private static final Logger log = Logger.get(PinotMetadata.class);

private static final Object ALL_TABLES_CACHE_KEY = new Object();
private static final String SCHEMA_NAME = "default";
private static final String PINOT_COLUMN_NAME_PROPERTY = "pinotColumnName";

private final NonEvictableLoadingCache<String, List<PinotColumnHandle>> pinotTableColumnCache;
private final NonEvictableLoadingCache<String, List<ColumnMetadata>> pinotTableColumnCache;
private final NonEvictableLoadingCache<Object, List<String>> allTablesCache;
private final int maxRowsPerBrokerQuery;
private final AggregateFunctionRewriter<AggregateExpression, Void> aggregateFunctionRewriter;
Expand All @@ -119,11 +121,11 @@ public PinotMetadata(
asyncReloading(new CacheLoader<>()
{
@Override
public List<PinotColumnHandle> load(String tableName)
public List<ColumnMetadata> load(String tableName)
throws Exception
{
Schema tablePinotSchema = pinotClient.getTableSchema(tableName);
return getPinotColumnsForPinotSchema(tablePinotSchema);
return getPinotColumnMetadataForPinotSchema(tablePinotSchema);
}
}, executor));

Expand Down Expand Up @@ -212,17 +214,11 @@ public Map<String, ColumnHandle> getPinotColumnHandles(String tableName)
ImmutableMap.Builder<String, ColumnHandle> columnHandlesBuilder = ImmutableMap.builder();
for (ColumnMetadata columnMetadata : getColumnsMetadata(tableName)) {
columnHandlesBuilder.put(columnMetadata.getName(),
new PinotColumnHandle(getPinotColumnName(columnMetadata), columnMetadata.getType()));
fromColumnMetadata(columnMetadata));
}
return columnHandlesBuilder.buildOrThrow();
}

private static String getPinotColumnName(ColumnMetadata columnMetadata)
{
Object pinotColumnName = requireNonNull(columnMetadata.getProperties().get(PINOT_COLUMN_NAME_PROPERTY), "Pinot column name is missing");
return pinotColumnName.toString();
}

@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
Expand Down Expand Up @@ -353,6 +349,14 @@ public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggrega
return Optional.empty();
}

// Do not push aggregations down if a grouping column is an array type.
// Pinot treats each element of array as a grouping key
// See https://github.com/apache/pinot/issues/8353 for more details.
Comment thread
hashhar marked this conversation as resolved.
Outdated
if (getOnlyElement(groupingSets).stream()
.filter(columnHandle -> ((PinotColumnHandle) columnHandle).getDataType() instanceof ArrayType)
.findFirst().isPresent()) {
return Optional.empty();
}
PinotTableHandle tableHandle = (PinotTableHandle) handle;
// If aggregates are present than no further aggregations
// can be pushed down: there are currently no subqueries in pinot.
Expand Down Expand Up @@ -488,7 +492,7 @@ private static PinotColumnHandle resolveAggregateExpressionWithAlias(PinotColumn
}

@VisibleForTesting
public List<PinotColumnHandle> getPinotColumns(String tableName)
public List<ColumnMetadata> getColumnsMetadata(String tableName)
{
String pinotTableName = getPinotTableNameFromTrinoTableName(tableName);
return getFromCache(pinotTableColumnCache, pinotTableName);
Expand Down Expand Up @@ -544,25 +548,20 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)
return new ConnectorTableMetadata(tableName, getColumnsMetadata(tableName.getTableName()));
}

private List<ColumnMetadata> getColumnsMetadata(String tableName)
private List<ColumnMetadata> getPinotColumnMetadataForPinotSchema(Schema pinotTableSchema)
{
List<PinotColumnHandle> columns = getPinotColumns(tableName);
return columns.stream()
.map(PinotMetadata::createPinotColumnMetadata)
return pinotTableSchema.getColumnNames().stream()
.filter(columnName -> !columnName.startsWith("$")) // Hidden columns starts with "$", ignore them as we can't use them in PQL
.map(columnName -> ColumnMetadata.builder()
.setName(columnName)
.setType(getTrinoTypeFromPinotType(pinotTableSchema.getFieldSpecFor(columnName)))
.setProperties(ImmutableMap.<String, Object>builder()
.put(PINOT_COLUMN_NAME_PROPERTY, columnName)
.buildOrThrow())
.build())
.collect(toImmutableList());
}

private static ColumnMetadata createPinotColumnMetadata(PinotColumnHandle pinotColumn)
{
return ColumnMetadata.builder()
.setName(pinotColumn.getColumnName().toLowerCase(ENGLISH))
.setType(pinotColumn.getDataType())
.setProperties(ImmutableMap.<String, Object>builder()
.put(PINOT_COLUMN_NAME_PROPERTY, pinotColumn.getColumnName())
.buildOrThrow())
.build();
}

private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix)
{
if (prefix.getSchema().isEmpty() || prefix.getTable().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.trino.plugin.pinot.query.DynamicTable;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;

import java.util.Objects;
Expand Down Expand Up @@ -89,11 +88,6 @@ public Optional<DynamicTable> getQuery()
return query;
}

public SchemaTableName toSchemaTableName()
{
return new SchemaTableName(schemaName, tableName);
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import java.util.Optional;

import static io.trino.plugin.pinot.PinotErrorCode.PINOT_DECODE_ERROR;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static java.util.Objects.requireNonNull;

Expand All @@ -35,31 +34,6 @@ private DecoderFactory()
{
}

protected static final String PINOT_INFINITY = "∞";
protected static final String PINOT_POSITIVE_INFINITY = "+" + PINOT_INFINITY;
protected static final String PINOT_NEGATIVE_INFINITY = "-" + PINOT_INFINITY;

protected static final Double TRINO_INFINITY = Double.POSITIVE_INFINITY;
protected static final Double TRINO_NEGATIVE_INFINITY = Double.NEGATIVE_INFINITY;

public static Double parseDouble(String value)
{
try {
requireNonNull(value, "value is null");
return Double.valueOf(value);
}
catch (NumberFormatException ne) {
switch (value) {
case PINOT_INFINITY:
case PINOT_POSITIVE_INFINITY:
return TRINO_INFINITY;
case PINOT_NEGATIVE_INFINITY:
return TRINO_NEGATIVE_INFINITY;
}
throw new PinotException(PINOT_DECODE_ERROR, Optional.empty(), "Cannot decode double value from pinot " + value, ne);
}
}

public static Decoder createDecoder(Type type)
{
requireNonNull(type, "type is null");
Expand Down
Loading