diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java index 7b70ca43b3f8..4762e9042e37 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.type.ArrayType; @@ -28,17 +29,16 @@ import io.trino.spi.type.VarcharType; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.data.Schema; -import java.util.List; import java.util.Objects; import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE; +import static io.trino.plugin.pinot.PinotMetadata.PINOT_COLUMN_NAME_PROPERTY; import static io.trino.plugin.pinot.query.DynamicTablePqlExtractor.quoteIdentifier; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class PinotColumnHandle @@ -88,12 +88,10 @@ public static PinotColumnHandle fromNonAggregateColumnHandle(PinotColumnHandle c return new PinotColumnHandle(columnHandle.getColumnName(), columnHandle.getDataType(), quoteIdentifier(columnHandle.getColumnName()), false, false, true, Optional.empty(), Optional.empty()); } - public static List getPinotColumnsForPinotSchema(Schema pinotTableSchema) + public static PinotColumnHandle fromColumnMetadata(ColumnMetadata columnMetadata) { - return pinotTableSchema.getColumnNames().stream() - .filter(columnName -> !columnName.startsWith("$")) // Hidden columns starts with "$", ignore them as we can't use them in PQL - .map(columnName -> new PinotColumnHandle(columnName, getTrinoTypeFromPinotType(pinotTableSchema.getFieldSpecFor(columnName)))) - .collect(toImmutableList()); + String columnName = (String) requireNonNull(columnMetadata.getProperties().get(PINOT_COLUMN_NAME_PROPERTY), format("Missing required column property '%s'", PINOT_COLUMN_NAME_PROPERTY)); + return new PinotColumnHandle(columnName, columnMetadata.getType()); } public static Type getTrinoTypeFromPinotType(FieldSpec field) @@ -204,7 +202,13 @@ public Optional getPushedDownAggregateFunctionArgument() public ColumnMetadata getColumnMetadata() { - return new ColumnMetadata(getColumnName(), getDataType()); + return ColumnMetadata.builder() + .setName(columnName) + .setType(dataType) + .setProperties(ImmutableMap.builder() + .put(PINOT_COLUMN_NAME_PROPERTY, columnName) + .buildOrThrow()) + .build(); } @Override diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java index 846bc8c4f0b3..1ef45ed55bef 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java @@ -77,7 +77,8 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache; -import static io.trino.plugin.pinot.PinotColumnHandle.getPinotColumnsForPinotSchema; +import static io.trino.plugin.pinot.PinotColumnHandle.fromColumnMetadata; +import static io.trino.plugin.pinot.PinotColumnHandle.getTrinoTypeFromPinotType; import static io.trino.plugin.pinot.PinotSessionProperties.isAggregationPushdownEnabled; import static io.trino.plugin.pinot.query.AggregateExpression.replaceIdentifier; import static java.util.Locale.ENGLISH; @@ -87,13 +88,14 @@ public class PinotMetadata implements ConnectorMetadata { + public static final String PINOT_COLUMN_NAME_PROPERTY = "pinotColumnName"; + private static final Logger log = Logger.get(PinotMetadata.class); private static final Object ALL_TABLES_CACHE_KEY = new Object(); private static final String SCHEMA_NAME = "default"; - private static final String PINOT_COLUMN_NAME_PROPERTY = "pinotColumnName"; - private final NonEvictableLoadingCache> pinotTableColumnCache; + private final NonEvictableLoadingCache> pinotTableColumnCache; private final NonEvictableLoadingCache> allTablesCache; private final int maxRowsPerBrokerQuery; private final AggregateFunctionRewriter aggregateFunctionRewriter; @@ -119,11 +121,11 @@ public PinotMetadata( asyncReloading(new CacheLoader<>() { @Override - public List load(String tableName) + public List load(String tableName) throws Exception { Schema tablePinotSchema = pinotClient.getTableSchema(tableName); - return getPinotColumnsForPinotSchema(tablePinotSchema); + return getPinotColumnMetadataForPinotSchema(tablePinotSchema); } }, executor)); @@ -212,17 +214,11 @@ public Map getPinotColumnHandles(String tableName) ImmutableMap.Builder columnHandlesBuilder = ImmutableMap.builder(); for (ColumnMetadata columnMetadata : getColumnsMetadata(tableName)) { columnHandlesBuilder.put(columnMetadata.getName(), - new PinotColumnHandle(getPinotColumnName(columnMetadata), columnMetadata.getType())); + fromColumnMetadata(columnMetadata)); } return columnHandlesBuilder.buildOrThrow(); } - private static String getPinotColumnName(ColumnMetadata columnMetadata) - { - Object pinotColumnName = requireNonNull(columnMetadata.getProperties().get(PINOT_COLUMN_NAME_PROPERTY), "Pinot column name is missing"); - return pinotColumnName.toString(); - } - @Override public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { @@ -353,6 +349,14 @@ public Optional> applyAggrega return Optional.empty(); } + // Do not push aggregations down if a grouping column is an array type. + // Pinot treats each element of array as a grouping key + // See https://github.com/apache/pinot/issues/8353 for more details. + if (getOnlyElement(groupingSets).stream() + .filter(columnHandle -> ((PinotColumnHandle) columnHandle).getDataType() instanceof ArrayType) + .findFirst().isPresent()) { + return Optional.empty(); + } PinotTableHandle tableHandle = (PinotTableHandle) handle; // If aggregates are present than no further aggregations // can be pushed down: there are currently no subqueries in pinot. @@ -488,7 +492,7 @@ private static PinotColumnHandle resolveAggregateExpressionWithAlias(PinotColumn } @VisibleForTesting - public List getPinotColumns(String tableName) + public List getColumnsMetadata(String tableName) { String pinotTableName = getPinotTableNameFromTrinoTableName(tableName); return getFromCache(pinotTableColumnCache, pinotTableName); @@ -544,25 +548,20 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) return new ConnectorTableMetadata(tableName, getColumnsMetadata(tableName.getTableName())); } - private List getColumnsMetadata(String tableName) + private List getPinotColumnMetadataForPinotSchema(Schema pinotTableSchema) { - List columns = getPinotColumns(tableName); - return columns.stream() - .map(PinotMetadata::createPinotColumnMetadata) + return pinotTableSchema.getColumnNames().stream() + .filter(columnName -> !columnName.startsWith("$")) // Hidden columns starts with "$", ignore them as we can't use them in PQL + .map(columnName -> ColumnMetadata.builder() + .setName(columnName) + .setType(getTrinoTypeFromPinotType(pinotTableSchema.getFieldSpecFor(columnName))) + .setProperties(ImmutableMap.builder() + .put(PINOT_COLUMN_NAME_PROPERTY, columnName) + .buildOrThrow()) + .build()) .collect(toImmutableList()); } - private static ColumnMetadata createPinotColumnMetadata(PinotColumnHandle pinotColumn) - { - return ColumnMetadata.builder() - .setName(pinotColumn.getColumnName().toLowerCase(ENGLISH)) - .setType(pinotColumn.getDataType()) - .setProperties(ImmutableMap.builder() - .put(PINOT_COLUMN_NAME_PROPERTY, pinotColumn.getColumnName()) - .buildOrThrow()) - .build(); - } - private List listTables(ConnectorSession session, SchemaTablePrefix prefix) { if (prefix.getSchema().isEmpty() || prefix.getTable().isEmpty()) { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTableHandle.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTableHandle.java index 948ee50e535a..8f6906791956 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTableHandle.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotTableHandle.java @@ -18,7 +18,6 @@ import io.trino.plugin.pinot.query.DynamicTable; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorTableHandle; -import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; import java.util.Objects; @@ -89,11 +88,6 @@ public Optional getQuery() return query; } - public SchemaTableName toSchemaTableName() - { - return new SchemaTableName(schemaName, tableName); - } - @Override public boolean equals(Object o) { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java index b3be6584fa25..96b235d9bc86 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java @@ -25,7 +25,6 @@ import java.util.Optional; -import static io.trino.plugin.pinot.PinotErrorCode.PINOT_DECODE_ERROR; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE; import static java.util.Objects.requireNonNull; @@ -35,31 +34,6 @@ private DecoderFactory() { } - protected static final String PINOT_INFINITY = "∞"; - protected static final String PINOT_POSITIVE_INFINITY = "+" + PINOT_INFINITY; - protected static final String PINOT_NEGATIVE_INFINITY = "-" + PINOT_INFINITY; - - protected static final Double TRINO_INFINITY = Double.POSITIVE_INFINITY; - protected static final Double TRINO_NEGATIVE_INFINITY = Double.NEGATIVE_INFINITY; - - public static Double parseDouble(String value) - { - try { - requireNonNull(value, "value is null"); - return Double.valueOf(value); - } - catch (NumberFormatException ne) { - switch (value) { - case PINOT_INFINITY: - case PINOT_POSITIVE_INFINITY: - return TRINO_INFINITY; - case PINOT_NEGATIVE_INFINITY: - return TRINO_NEGATIVE_INFINITY; - } - throw new PinotException(PINOT_DECODE_ERROR, Optional.empty(), "Cannot decode double value from pinot " + value, ne); - } - } - public static Decoder createDecoder(Type type) { requireNonNull(type, "type is null"); diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java index 28964f347adf..cb12806bfeff 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java @@ -30,19 +30,24 @@ import org.apache.pinot.common.request.context.OrderByExpressionContext; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.reduce.PostAggregationHandler; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter; +import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.Set; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.pinot.PinotColumnHandle.fromNonAggregateColumnHandle; import static io.trino.plugin.pinot.PinotColumnHandle.getTrinoTypeFromPinotType; +import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE; import static io.trino.plugin.pinot.query.PinotExpressionRewriter.rewriteExpression; import static io.trino.plugin.pinot.query.PinotPatterns.WILDCARD; @@ -57,12 +62,17 @@ import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; +import static org.apache.pinot.segment.spi.AggregationFunctionType.COUNT; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNT; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTHLL; +import static org.apache.pinot.segment.spi.AggregationFunctionType.getAggregationFunctionType; public final class DynamicTableBuilder { private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler(); public static final String OFFLINE_SUFFIX = "_OFFLINE"; public static final String REALTIME_SUFFIX = "_REALTIME"; + private static final Set NON_NULL_ON_EMPTY_AGGREGATIONS = EnumSet.of(COUNT, DISTINCTCOUNT, DISTINCTCOUNTHLL); private DynamicTableBuilder() { @@ -84,14 +94,12 @@ public static DynamicTable buildFromPql(PinotMetadata pinotMetadata, SchemaTable PinotTypeResolver pinotTypeResolver = new PinotTypeResolver(pinotClient, pinotTableName); List selectColumns = ImmutableList.of(); - ImmutableMap.Builder aggregateTypesBuilder = ImmutableMap.builder(); + Map aggregateTypes = ImmutableMap.of(); if (queryContext.getAggregationFunctions() != null) { checkState(queryContext.getAggregationFunctions().length > 0, "Aggregation Functions is empty"); - for (AggregationFunction aggregationFunction : queryContext.getAggregationFunctions()) { - aggregateTypesBuilder.put(aggregationFunction.getResultColumnName(), toTrinoType(aggregationFunction.getFinalResultColumnType())); - } + aggregateTypes = getAggregateTypes(schemaTableName, queryContext, columnHandles); } - Map aggregateTypes = aggregateTypesBuilder.buildOrThrow(); + if (queryContext.getSelectExpressions() != null) { checkState(!queryContext.getSelectExpressions().isEmpty(), "Pinot selections is empty"); selectColumns = getPinotColumns(schemaTableName, queryContext.getSelectExpressions(), queryContext.getAliasList(), columnHandles, pinotTypeResolver, aggregateTypes); @@ -150,7 +158,7 @@ private static Type toTrinoType(DataSchema.ColumnDataType columnDataType) throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported column data type: " + columnDataType); } - private static List getPinotColumns(SchemaTableName schemaTableName, List expressions, List aliases, Map columnHandles, PinotTypeResolver pinotTypeResolver, Map aggregateTypes) + private static List getPinotColumns(SchemaTableName schemaTableName, List expressions, List aliases, Map columnHandles, PinotTypeResolver pinotTypeResolver, Map aggregateTypes) { ImmutableList.Builder pinotColumnsBuilder = ImmutableList.builder(); for (int index = 0; index < expressions.size(); index++) { @@ -170,22 +178,27 @@ private static List getPinotColumns(SchemaTableName schemaTab return pinotColumnsBuilder.build(); } - private static PinotColumnHandle getPinotColumnHandle(SchemaTableName schemaTableName, ExpressionContext expressionContext, Optional alias, Map columnHandles, PinotTypeResolver pinotTypeResolver, Map aggregateTypes) + private static PinotColumnHandle getPinotColumnHandle(SchemaTableName schemaTableName, ExpressionContext expressionContext, Optional alias, Map columnHandles, PinotTypeResolver pinotTypeResolver, Map aggregateTypes) { ExpressionContext rewritten = rewriteExpression(schemaTableName, expressionContext, columnHandles); // If there is no alias, pinot autogenerates the column name: String columnName = rewritten.toString(); String pinotExpression = formatExpression(schemaTableName, rewritten); Type trinoType; - boolean isAggregate = isAggregate(rewritten); + boolean isAggregate = hasAggregate(rewritten); if (isAggregate) { - trinoType = requireNonNull(aggregateTypes.get(columnName.toLowerCase(ENGLISH)), format("Unexpected aggregate expression: '%s'", rewritten)); + trinoType = requireNonNull(aggregateTypes.get(columnName).getTrinoType(), format("Unexpected aggregate expression: '%s'", rewritten)); + // For aggregation queries, the column name is set by the schema returned from PostAggregationHandler, see getAggregateTypes + columnName = aggregateTypes.get(columnName).getPinotColumnName(); } else { trinoType = getTrinoTypeFromPinotType(pinotTypeResolver.resolveExpressionType(rewritten, schemaTableName, columnHandles)); + if (!aggregateTypes.isEmpty() && trinoType instanceof ArrayType) { + trinoType = ((ArrayType) trinoType).getElementType(); + } } - return new PinotColumnHandle(alias.orElse(columnName), trinoType, pinotExpression, alias.isPresent(), isAggregate, true, Optional.empty(), Optional.empty()); + return new PinotColumnHandle(alias.orElse(columnName), trinoType, pinotExpression, alias.isPresent(), isAggregate, isReturnNullOnEmptyGroup(expressionContext), Optional.empty(), Optional.empty()); } private static Optional getAlias(List aliases, int index) @@ -202,6 +215,81 @@ private static boolean isAggregate(ExpressionContext expressionContext) return expressionContext.getType() == ExpressionContext.Type.FUNCTION && expressionContext.getFunction().getType() == FunctionContext.Type.AGGREGATION; } + private static boolean hasAggregate(ExpressionContext expressionContext) + { + switch (expressionContext.getType()) { + case IDENTIFIER: + case LITERAL: + return false; + case FUNCTION: + if (isAggregate(expressionContext)) { + return true; + } + for (ExpressionContext argument : expressionContext.getFunction().getArguments()) { + if (hasAggregate(argument)) { + return true; + } + } + return false; + } + throw new PinotException(PINOT_EXCEPTION, Optional.empty(), format("Unsupported expression type '%s'", expressionContext.getType())); + } + + private static Map getAggregateTypes(SchemaTableName schemaTableName, QueryContext queryContext, Map columnHandles) + { + // A mapping from pinot expression to the returned pinot column name and trino type + // Note: the column name is set by the PostAggregationHandler + List aggregateColumnExpressions = queryContext.getSelectExpressions().stream() + .filter(DynamicTableBuilder::hasAggregate) + .collect(toImmutableList()); + queryContext = new QueryContext.Builder() + .setAliasList(queryContext.getAliasList()) + .setSelectExpressions(aggregateColumnExpressions) + .build(); + DataSchema preAggregationSchema = getPreAggregationDataSchema(queryContext); + PostAggregationHandler postAggregationHandler = new PostAggregationHandler(queryContext, preAggregationSchema); + DataSchema postAggregtionSchema = postAggregationHandler.getResultDataSchema(); + ImmutableMap.Builder aggregationTypesBuilder = ImmutableMap.builder(); + for (int index = 0; index < postAggregtionSchema.size(); index++) { + aggregationTypesBuilder.put( + // ExpressionContext#toString performs quoting of literals + // Quoting of identifiers is not done to match the corresponding column name in the ResultTable returned from Pinot. Quoting will be done by `DynamicTablePqlExtractor`. + rewriteExpression(schemaTableName, + aggregateColumnExpressions.get(index), + columnHandles).toString(), + new PinotColumnNameAndTrinoType( + postAggregtionSchema.getColumnName(index), + toTrinoType(postAggregtionSchema.getColumnDataType(index)))); + } + return aggregationTypesBuilder.buildOrThrow(); + } + + // Extracted from org.apache.pinot.core.query.reduce.AggregationDataTableReducer + private static DataSchema getPreAggregationDataSchema(QueryContext queryContext) + { + AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions(); + int numAggregationFunctions = aggregationFunctions.length; + String[] columnNames = new String[numAggregationFunctions]; + DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numAggregationFunctions]; + for (int i = 0; i < numAggregationFunctions; i++) { + AggregationFunction aggregationFunction = aggregationFunctions[i]; + columnNames[i] = aggregationFunction.getResultColumnName(); + columnDataTypes[i] = aggregationFunction.getFinalResultColumnType(); + } + return new DataSchema(columnNames, columnDataTypes); + } + + // To keep consistent behavior with pushed down aggregates, only return non null on an empty group + // if the top level function is in NON_NULL_ON_EMPTY_AGGREGATIONS. + // For all other cases, keep the same behavior as Pinot, since likely the same results are expected. + private static boolean isReturnNullOnEmptyGroup(ExpressionContext expressionContext) + { + if (isAggregate(expressionContext)) { + return !NON_NULL_ON_EMPTY_AGGREGATIONS.contains(getAggregationFunctionType(expressionContext.getFunction().getFunctionName())); + } + return true; + } + private static OptionalLong getOffset(QueryContext queryContext) { if (queryContext.getOffset() > 0) { @@ -239,4 +327,26 @@ else if (tableName.toUpperCase(ENGLISH).endsWith(REALTIME_SUFFIX)) { return Optional.empty(); } } + + private static class PinotColumnNameAndTrinoType + { + private final String pinotColumnName; + private final Type trinoType; + + public PinotColumnNameAndTrinoType(String pinotColumnName, Type trinoType) + { + this.pinotColumnName = requireNonNull(pinotColumnName, "pinotColumnName is null"); + this.trinoType = requireNonNull(trinoType, "trinoType is null"); + } + + public String getPinotColumnName() + { + return pinotColumnName; + } + + public Type getTrinoType() + { + return trinoType; + } + } } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTablePqlExtractor.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTablePqlExtractor.java index 0231a2044661..36b33819425e 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTablePqlExtractor.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTablePqlExtractor.java @@ -54,7 +54,7 @@ public static String extractPql(DynamicTable table, TupleDomain tu builder.append(table.getTableName()); builder.append(table.getSuffix().orElse("")); - Optional filter = getFilter(table.getFilter(), tupleDomain, columnHandles); + Optional filter = getFilter(table.getFilter(), tupleDomain); if (filter.isPresent()) { builder.append(" where ") .append(filter.get()); @@ -82,9 +82,9 @@ public static String extractPql(DynamicTable table, TupleDomain tu return builder.toString(); } - private static Optional getFilter(Optional filter, TupleDomain tupleDomain, List columnHandles) + private static Optional getFilter(Optional filter, TupleDomain tupleDomain) { - Optional tupleFilter = getFilterClause(tupleDomain, Optional.empty(), columnHandles); + Optional tupleFilter = getFilterClause(tupleDomain, Optional.empty()); if (tupleFilter.isPresent() && filter.isPresent()) { return Optional.of(format("%s AND %s", encloseInParentheses(tupleFilter.get()), encloseInParentheses(filter.get()))); diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java index d204ad41af05..3c13188fad61 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java @@ -69,7 +69,7 @@ public static String generatePql(PinotTableHandle tableHandle, List timePredicate, List columnHandles) + private static void generateFilterPql(StringBuilder pqlBuilder, PinotTableHandle tableHandle, Optional timePredicate) { - Optional filterClause = getFilterClause(tableHandle.getConstraint(), timePredicate, columnHandles); + Optional filterClause = getFilterClause(tableHandle.getConstraint(), timePredicate); if (filterClause.isPresent()) { pqlBuilder.append(" WHERE ") .append(filterClause.get()); } } - public static Optional getFilterClause(TupleDomain tupleDomain, Optional timePredicate, List columnHandles) + public static Optional getFilterClause(TupleDomain tupleDomain, Optional timePredicate) { ImmutableList.Builder conjunctsBuilder = ImmutableList.builder(); timePredicate.ifPresent(conjunctsBuilder::add); diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/aggregation/ImplementApproxDistinct.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/aggregation/ImplementApproxDistinct.java index 0edde07eee2b..e17c5f56f3c2 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/aggregation/ImplementApproxDistinct.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/aggregation/ImplementApproxDistinct.java @@ -17,6 +17,7 @@ import io.trino.matching.Captures; import io.trino.matching.Pattern; import io.trino.plugin.base.aggregation.AggregateFunctionRule; +import io.trino.plugin.pinot.PinotColumnHandle; import io.trino.plugin.pinot.query.AggregateExpression; import io.trino.spi.connector.AggregateFunction; import io.trino.spi.expression.Variable; @@ -59,6 +60,7 @@ public Pattern getPattern() public Optional rewrite(AggregateFunction aggregateFunction, Captures captures, RewriteContext context) { Variable argument = captures.get(ARGUMENT); - return Optional.of(new AggregateExpression("distinctcounthll", identifierQuote.apply(argument.getName()), false)); + PinotColumnHandle columnHandle = (PinotColumnHandle) context.getAssignment(argument.getName()); + return Optional.of(new AggregateExpression("distinctcounthll", identifierQuote.apply(columnHandle.getColumnName()), false)); } } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/aggregation/ImplementAvg.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/aggregation/ImplementAvg.java index 7754c7145aef..df7d7e0b45e9 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/aggregation/ImplementAvg.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/aggregation/ImplementAvg.java @@ -18,6 +18,7 @@ import io.trino.matching.Captures; import io.trino.matching.Pattern; import io.trino.plugin.base.aggregation.AggregateFunctionRule; +import io.trino.plugin.pinot.PinotColumnHandle; import io.trino.plugin.pinot.query.AggregateExpression; import io.trino.spi.connector.AggregateFunction; import io.trino.spi.expression.Variable; @@ -67,6 +68,7 @@ public Pattern getPattern() public Optional rewrite(AggregateFunction aggregateFunction, Captures captures, RewriteContext context) { Variable argument = captures.get(ARGUMENT); - return Optional.of(new AggregateExpression(aggregateFunction.getFunctionName(), identifierQuote.apply(argument.getName()), true)); + PinotColumnHandle columnHandle = (PinotColumnHandle) context.getAssignment(argument.getName()); + return Optional.of(new AggregateExpression(aggregateFunction.getFunctionName(), identifierQuote.apply(columnHandle.getColumnName()), true)); } } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java index bddc8863fe53..2845ca4180d6 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java @@ -82,6 +82,7 @@ public abstract class AbstractPinotIntegrationSmokeTest private static final String JSON_TABLE = "my_table"; private static final String RESERVED_KEYWORD_TABLE = "reserved_keyword"; private static final String QUOTES_IN_COLUMN_NAME_TABLE = "quotes_in_column_name"; + private static final String DUPLICATE_VALUES_IN_COLUMNS_TABLE = "duplicate_values_in_columns"; // Use a recent value for updated_at to ensure Pinot doesn't clean up records older than retentionTimeValue as defined in the table specs private static final Instant initialUpdatedAt = Instant.now().minus(Duration.ofDays(1)).truncatedTo(SECONDS); // Use a fixed instant for testing date time functions @@ -296,6 +297,69 @@ protected QueryRunner createQueryRunner() pinot.createSchema(getClass().getClassLoader().getResourceAsStream("quotes_in_column_name_schema.json"), QUOTES_IN_COLUMN_NAME_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("quotes_in_column_name_realtimeSpec.json"), QUOTES_IN_COLUMN_NAME_TABLE); + // Create a table having multiple columns with duplicate values + kafka.createTopic(DUPLICATE_VALUES_IN_COLUMNS_TABLE); + Schema duplicateValuesInColumnsAvroSchema = SchemaBuilder.record(DUPLICATE_VALUES_IN_COLUMNS_TABLE).fields() + .name("dim_col").type().optional().longType() + .name("another_dim_col").type().optional().longType() + .name("string_col").type().optional().stringType() + .name("another_string_col").type().optional().stringType() + .name("metric_col1").type().optional().longType() + .name("metric_col2").type().optional().longType() + .name("updated_at").type().longType().noDefault() + .endRecord(); + + ImmutableList.Builder> duplicateValuesInColumnsRecordsBuilder = ImmutableList.builder(); + duplicateValuesInColumnsRecordsBuilder.add(new ProducerRecord<>(DUPLICATE_VALUES_IN_COLUMNS_TABLE, "key0", new GenericRecordBuilder(duplicateValuesInColumnsAvroSchema) + .set("dim_col", 1000L) + .set("another_dim_col", 1000L) + .set("string_col", "string1") + .set("another_string_col", "string1") + .set("metric_col1", 10L) + .set("metric_col2", 20L) + .set("updated_at", initialUpdatedAt.plusMillis(1000).toEpochMilli()) + .build())); + duplicateValuesInColumnsRecordsBuilder.add(new ProducerRecord<>(DUPLICATE_VALUES_IN_COLUMNS_TABLE, "key1", new GenericRecordBuilder(duplicateValuesInColumnsAvroSchema) + .set("dim_col", 2000L) + .set("another_dim_col", 2000L) + .set("string_col", "string1") + .set("another_string_col", "string1") + .set("metric_col1", 100L) + .set("metric_col2", 200L) + .set("updated_at", initialUpdatedAt.plusMillis(2000).toEpochMilli()) + .build())); + duplicateValuesInColumnsRecordsBuilder.add(new ProducerRecord<>(DUPLICATE_VALUES_IN_COLUMNS_TABLE, "key2", new GenericRecordBuilder(duplicateValuesInColumnsAvroSchema) + .set("dim_col", 3000L) + .set("another_dim_col", 3000L) + .set("string_col", "string1") + .set("another_string_col", "another_string1") + .set("metric_col1", 1000L) + .set("metric_col2", 2000L) + .set("updated_at", initialUpdatedAt.plusMillis(3000).toEpochMilli()) + .build())); + duplicateValuesInColumnsRecordsBuilder.add(new ProducerRecord<>(DUPLICATE_VALUES_IN_COLUMNS_TABLE, "key1", new GenericRecordBuilder(duplicateValuesInColumnsAvroSchema) + .set("dim_col", 4000L) + .set("another_dim_col", 4000L) + .set("string_col", "string2") + .set("another_string_col", "another_string2") + .set("metric_col1", 100L) + .set("metric_col2", 200L) + .set("updated_at", initialUpdatedAt.plusMillis(4000).toEpochMilli()) + .build())); + duplicateValuesInColumnsRecordsBuilder.add(new ProducerRecord<>(DUPLICATE_VALUES_IN_COLUMNS_TABLE, "key2", new GenericRecordBuilder(duplicateValuesInColumnsAvroSchema) + .set("dim_col", 4000L) + .set("another_dim_col", 4001L) + .set("string_col", "string2") + .set("another_string_col", "string2") + .set("metric_col1", 1000L) + .set("metric_col2", 2000L) + .set("updated_at", initialUpdatedAt.plusMillis(5000).toEpochMilli()) + .build())); + + kafka.sendMessages(duplicateValuesInColumnsRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka)); + pinot.createSchema(getClass().getClassLoader().getResourceAsStream("duplicate_values_in_columns_schema.json"), DUPLICATE_VALUES_IN_COLUMNS_TABLE); + pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("duplicate_values_in_columns_realtimeSpec.json"), DUPLICATE_VALUES_IN_COLUMNS_TABLE); + return PinotQueryRunner.createPinotQueryRunner( ImmutableMap.of(), pinotProperties(pinot), @@ -668,6 +732,20 @@ public void testNonLowerCaseColumnNames() "\" WHERE longcol = 3")) .matches(singleRowValues) .isFullyPushedDown(); + + assertThat(query("SELECT AVG(longcol), MIN(longcol), MAX(longcol), APPROX_DISTINCT(longcol), SUM(longcol)" + + " FROM " + MIXED_CASE_COLUMN_NAMES_TABLE)) + .matches("VALUES (DOUBLE '1.5', BIGINT '0', BIGINT '3', BIGINT '4', BIGINT '6')") + .isFullyPushedDown(); + + assertThat(query("SELECT stringcol, AVG(longcol), MIN(longcol), MAX(longcol), APPROX_DISTINCT(longcol), SUM(longcol)" + + " FROM " + MIXED_CASE_COLUMN_NAMES_TABLE + + " GROUP BY stringcol")) + .matches("VALUES (VARCHAR 'string_0', DOUBLE '0.0', BIGINT '0', BIGINT '0', BIGINT '1', BIGINT '0')," + + " (VARCHAR 'string_1', DOUBLE '1.0', BIGINT '1', BIGINT '1', BIGINT '1', BIGINT '1')," + + " (VARCHAR 'string_2', DOUBLE '2.0', BIGINT '2', BIGINT '2', BIGINT '1', BIGINT '2')," + + " (VARCHAR 'string_3', DOUBLE '3.0', BIGINT '3', BIGINT '3', BIGINT '1', BIGINT '3')") + .isFullyPushedDown(); } @Test @@ -1107,7 +1185,14 @@ public void testAggregationPushdown() " MIN(long_col), MAX(long_col), AVG(long_col), SUM(long_col)," + " MIN(float_col), MAX(float_col), AVG(float_col), SUM(float_col)," + " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)" + - " FROM " + ALL_TYPES_TABLE + " WHERE long_col > 4147483649")).isFullyPushedDown(); + " FROM " + ALL_TYPES_TABLE + " WHERE long_col > 4147483649")) + .isFullyPushedDown(); + + // Ensure that isNullOnEmptyGroup is handled correctly for passthrough queries as well + assertThat(query("SELECT \"count(*)\", \"distinctcounthll(string_col)\", \"distinctcount(string_col)\", \"sum(created_at_seconds)\", \"max(created_at_seconds)\"" + + " FROM \"SELECT count(*), distinctcounthll(string_col), distinctcount(string_col), sum(created_at_seconds), max(created_at_seconds) FROM " + DATE_TIME_FIELDS_TABLE + " WHERE created_at_seconds = 0\"")) + .matches("VALUES (BIGINT '0', BIGINT '0', INTEGER '0', CAST(NULL AS DOUBLE), CAST(NULL AS DOUBLE))") + .isFullyPushedDown(); // Test passthrough queries with no aggregates assertThat(query("SELECT string_col, COUNT(*)," + @@ -1765,4 +1850,89 @@ public void testLimitAndOffsetWithPushedDownAggregates() " (BIGINT '-3147483640', VARCHAR 'string_8400')") .isFullyPushedDown(); } + + @Test + public void testAggregatePassthroughQueriesWithExpressions() + { + assertThat(query("SELECT string_col, sum_metric_col1, count_dup_string_col, ratio_metric_col" + + " FROM \"SELECT string_col, SUM(metric_col1) AS sum_metric_col1, COUNT(DISTINCT another_string_col) AS count_dup_string_col," + + " (SUM(metric_col1) - SUM(metric_col2)) / SUM(metric_col1) AS ratio_metric_col" + + " FROM duplicate_values_in_columns WHERE dim_col = another_dim_col" + + " GROUP BY string_col" + + " ORDER BY string_col\"")) + .matches("VALUES (VARCHAR 'string1', DOUBLE '1110.0', 2, DOUBLE '-1.0')," + + " (VARCHAR 'string2', DOUBLE '100.0', 1, DOUBLE '-1.0')"); + + assertThat(query("SELECT string_col, sum_metric_col1, count_dup_string_col, ratio_metric_col" + + " FROM \"SELECT string_col, SUM(metric_col1) AS sum_metric_col1," + + " COUNT(DISTINCT another_string_col) AS count_dup_string_col," + + " (SUM(metric_col1) - SUM(metric_col2)) / SUM(metric_col1) AS ratio_metric_col" + + " FROM duplicate_values_in_columns WHERE dim_col != another_dim_col" + + " GROUP BY string_col" + + " ORDER BY string_col\"")) + .matches("VALUES (VARCHAR 'string2', DOUBLE '1000.0', 1, DOUBLE '-1.0')"); + + assertThat(query("SELECT DISTINCT string_col, another_string_col" + + " FROM \"SELECT string_col, another_string_col" + + " FROM duplicate_values_in_columns WHERE dim_col = another_dim_col\"")) + .matches("VALUES (VARCHAR 'string1', VARCHAR 'string1')," + + " (VARCHAR 'string1', VARCHAR 'another_string1')," + + " (VARCHAR 'string2', VARCHAR 'another_string2')"); + + assertThat(query("SELECT string_col, sum_metric_col1" + + " FROM \"SELECT string_col," + + " SUM(CASE WHEN dim_col = another_dim_col THEN metric_col1 ELSE 0 END) AS sum_metric_col1" + + " FROM duplicate_values_in_columns GROUP BY string_col ORDER BY string_col\"")) + .matches("VALUES (VARCHAR 'string1', DOUBLE '1110.0')," + + " (VARCHAR 'string2', DOUBLE '100.0')"); + + assertThat(query("SELECT \"percentile(int_col, 90.0)\"" + + " FROM \"SELECT percentile(int_col, 90) FROM " + ALL_TYPES_TABLE + "\"")) + .matches("VALUES (DOUBLE '56.0')"); + + assertThat(query("SELECT bool_col, \"percentile(int_col, 90.0)\"" + + " FROM \"SELECT bool_col, percentile(int_col, 90) FROM " + ALL_TYPES_TABLE + " GROUP BY bool_col\"")) + .matches("VALUES (true, DOUBLE '56.0')," + + " (false, DOUBLE '0.0')"); + + assertThat(query("SELECT \"sqrt(percentile(sqrt(int_col),'26.457513110645905'))\"" + + " FROM \"SELECT sqrt(percentile(sqrt(int_col), sqrt(700))) FROM " + ALL_TYPES_TABLE + "\"")) + .matches("VALUES (DOUBLE '2.7108060108295344')"); + + assertThat(query("SELECT int_col, \"sqrt(percentile(sqrt(int_col),'26.457513110645905'))\"" + + " FROM \"SELECT int_col, sqrt(percentile(sqrt(int_col), sqrt(700))) FROM " + ALL_TYPES_TABLE + " GROUP BY int_col\"")) + .matches("VALUES (54, DOUBLE '2.7108060108295344')," + + " (55, DOUBLE '2.7232698153315003')," + + " (56, DOUBLE '2.7355647997347607')," + + " (0, DOUBLE '0.0')"); + } + + @Test + public void testAggregationPushdownWithArrays() + { + assertThat(query("SELECT string_array_col, count(*) FROM " + ALL_TYPES_TABLE + " WHERE int_col = 54 GROUP BY 1")) + .isNotFullyPushedDown(ExchangeNode.class, ProjectNode.class, AggregationNode.class, ExchangeNode.class, ExchangeNode.class, AggregationNode.class, ProjectNode.class); + assertThat(query("SELECT int_array_col, string_array_col, count(*) FROM " + ALL_TYPES_TABLE + " WHERE int_col = 54 GROUP BY 1, 2")) + .isNotFullyPushedDown(ExchangeNode.class, ProjectNode.class, AggregationNode.class, ExchangeNode.class, ExchangeNode.class, AggregationNode.class, ProjectNode.class); + assertThat(query("SELECT int_array_col, \"count(*)\"" + + " FROM \"SELECT int_array_col, COUNT(*) FROM " + ALL_TYPES_TABLE + + " WHERE int_col = 54 GROUP BY 1\"")) + .isFullyPushedDown() + .matches("VALUES (-10001, BIGINT '3')," + + "(54, BIGINT '3')," + + "(1000, BIGINT '3')"); + assertThat(query("SELECT int_array_col, string_array_col, \"count(*)\"" + + " FROM \"SELECT int_array_col, string_array_col, COUNT(*) FROM " + ALL_TYPES_TABLE + + " WHERE int_col = 56 AND string_col = 'string_8400' GROUP BY 1, 2\"")) + .isFullyPushedDown() + .matches("VALUES (-10001, VARCHAR 'string_8400', BIGINT '1')," + + "(-10001, VARCHAR 'string2_8402', BIGINT '1')," + + "(1000, VARCHAR 'string2_8402', BIGINT '1')," + + "(56, VARCHAR 'string2_8402', BIGINT '1')," + + "(-10001, VARCHAR 'string1_8401', BIGINT '1')," + + "(56, VARCHAR 'string1_8401', BIGINT '1')," + + "(1000, VARCHAR 'string_8400', BIGINT '1')," + + "(56, VARCHAR 'string_8400', BIGINT '1')," + + "(1000, VARCHAR 'string1_8401', BIGINT '1')"); + } } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java index 52eeb7be2ee7..f1b0e1cc5435 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java @@ -35,6 +35,7 @@ import static io.trino.plugin.pinot.query.DynamicTablePqlExtractor.extractPql; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.String.format; +import static java.lang.String.join; import static java.util.Locale.ENGLISH; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toList; @@ -55,8 +56,7 @@ public void testSelectNoFilter() .collect(toList()); long limit = 230; String query = format("select %s from %s order by %s limit %s", - columnNames.stream() - .collect(joining(", ")), + join(", ", columnNames), tableName, orderByColumns.stream() .collect(joining(", ")) + " desc", diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotQueryBase.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotQueryBase.java index ee226ad98a18..3c576f339fd8 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotQueryBase.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotQueryBase.java @@ -41,7 +41,8 @@ public class TestPinotQueryBase protected List getColumnNames(String table) { - return pinotMetadata.getPinotColumns(table).stream() + return pinotMetadata.getColumnsMetadata(table).stream() + .map(PinotColumnHandle::fromColumnMetadata) .map(PinotColumnHandle::getColumnName) .collect(toImmutableList()); } diff --git a/plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_realtimeSpec.json b/plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_realtimeSpec.json new file mode 100644 index 000000000000..d97fb7d57756 --- /dev/null +++ b/plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_realtimeSpec.json @@ -0,0 +1,45 @@ +{ + "tableName": "duplicate_values_in_columns", + "tableType": "REALTIME", + "segmentsConfig": { + "timeColumnName": "updated_at_seconds", + "timeType": "SECONDS", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "365", + "segmentPushType": "APPEND", + "segmentPushFrequency": "daily", + "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", + "schemaName": "duplicate_values_in_columns", + "replicasPerPartition": "1" + }, + "tenants": { + "broker": "DefaultTenant", + "server": "DefaultTenant" + }, + "tableIndexConfig": { + "loadMode": "MMAP", + "invertedIndexColumns": [], + "sortedColumn": ["updated_at_seconds"], + "starTreeIndexConfigs": [], + "nullHandlingEnabled": "true", + "streamConfigs": { + "streamType": "kafka", + "stream.kafka.consumer.type": "LowLevel", + "stream.kafka.topic.name": "duplicate_values_in_columns", + "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", + "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", + "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.zk.broker.url": "zookeeper:2181/", + "stream.kafka.broker.list": "kafka:9092", + "realtime.segment.flush.threshold.time": "1m", + "realtime.segment.flush.threshold.size": "0", + "realtime.segment.flush.desired.size": "1M", + "isolation.level": "read_committed", + "stream.kafka.consumer.prop.auto.offset.reset": "smallest", + "stream.kafka.consumer.prop.group.id": "pinot_duplicate_values_in_columns" + } + }, + "metadata": { + "customConfigs": {} + } +} diff --git a/plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_schema.json b/plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_schema.json new file mode 100644 index 000000000000..6a4ca08cd280 --- /dev/null +++ b/plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_schema.json @@ -0,0 +1,41 @@ +{ + "schemaName": "duplicate_values_in_columns", + "dimensionFieldSpecs": [ + { + "name": "dim_col", + "dataType": "LONG" + }, + { + "name": "another_dim_col", + "dataType": "LONG" + }, + { + "name": "string_col", + "dataType": "STRING" + }, + { + "name": "another_string_col", + "dataType": "STRING" + } + ], + "metricFieldSpecs": [ + { + "name": "metric_col1", + "dataType": "LONG" + }, + { + "name": "metric_col2", + "dataType": "LONG" + } + ], + "dateTimeFieldSpecs": [ + { + "name": "updated_at_seconds", + "dataType": "LONG", + "defaultNullValue" : 0, + "format": "1:SECONDS:EPOCH", + "transformFunction": "toEpochSeconds(updated_at)", + "granularity" : "1:SECONDS" + } + ] +}