From d22b30f317df246554d2ab17f9c280512dc1b1d4 Mon Sep 17 00:00:00 2001 From: Daniel del Castillo Date: Tue, 26 Apr 2022 20:27:48 +0100 Subject: [PATCH] Add timestamp support to Pinot connector --- docs/src/main/sphinx/connector/pinot.rst | 7 +- .../trino/plugin/pinot/PinotColumnHandle.java | 3 + .../plugin/pinot/PinotSegmentPageSource.java | 4 ++ .../pinot/conversion/PinotTimestamps.java | 68 +++++++++++++++++++ .../plugin/pinot/decoders/DecoderFactory.java | 4 ++ .../pinot/decoders/TimestampDecoder.java | 64 +++++++++++++++++ .../pinot/query/DynamicTableBuilder.java | 3 + .../plugin/pinot/query/PinotQueryBuilder.java | 17 ++++- .../plugin/pinot/query/PinotTypeResolver.java | 4 ++ .../query/aggregation/ImplementMinMax.java | 3 +- .../AbstractPinotIntegrationSmokeTest.java | 54 ++++++++++++--- .../test/resources/alltypes_realtimeSpec.json | 2 + .../src/test/resources/alltypes_schema.json | 4 ++ 13 files changed, 220 insertions(+), 17 deletions(-) create mode 100644 plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/conversion/PinotTimestamps.java create mode 100644 plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/TimestampDecoder.java diff --git a/docs/src/main/sphinx/connector/pinot.rst b/docs/src/main/sphinx/connector/pinot.rst index eecac02b6b5c..3d32496f5cb5 100644 --- a/docs/src/main/sphinx/connector/pinot.rst +++ b/docs/src/main/sphinx/connector/pinot.rst @@ -158,21 +158,22 @@ Data types Pinot does not allow null values in any data type and supports the following primitive types: -========================== ============ +========================== ============== Pinot Trino -========================== ============ +========================== ============== ``INT`` ``INTEGER`` ``LONG`` ``BIGINT`` ``FLOAT`` ``REAL`` ``DOUBLE`` ``DOUBLE`` ``STRING`` ``VARCHAR`` ``BYTES`` ``VARBINARY`` +``TIMESTAMP`` ``TIMESTAMP`` ``INT_ARRAY`` ``VARCHAR`` ``LONG_ARRAY`` ``VARCHAR`` ``FLOAT_ARRAY`` ``VARCHAR`` ``DOUBLE_ARRAY`` ``VARCHAR`` ``STRING_ARRAY`` ``VARCHAR`` -========================== ============ +========================== ============== .. _pinot-sql-support: diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java index 3d0b0c406b4d..a5948007561e 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotColumnHandle.java @@ -24,6 +24,7 @@ import io.trino.spi.type.DoubleType; import io.trino.spi.type.IntegerType; import io.trino.spi.type.RealType; +import io.trino.spi.type.TimestampType; import io.trino.spi.type.Type; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; @@ -131,6 +132,8 @@ public static Type getTrinoTypeFromPinotType(FieldSpec.DataType dataType) return VarcharType.VARCHAR; case BYTES: return VarbinaryType.VARBINARY; + case TIMESTAMP: + return TimestampType.TIMESTAMP_MILLIS; default: break; } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java index fb74a017c221..a7e3a45d48c3 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java @@ -17,6 +17,7 @@ import io.airlift.slice.Slices; import io.trino.plugin.pinot.client.PinotDataFetcher; import io.trino.plugin.pinot.client.PinotDataTableWithSize; +import io.trino.plugin.pinot.conversion.PinotTimestamps; import io.trino.spi.Page; import io.trino.spi.PageBuilder; import io.trino.spi.TrinoException; @@ -165,6 +166,7 @@ private void writeBlock(BlockBuilder blockBuilder, Type columnType, int columnId writeBooleanBlock(blockBuilder, columnType, columnIdx); } else if (javaType.equals(long.class)) { + // Applies to timestamp as well since precision is milliseconds writeLongBlock(blockBuilder, columnType, columnIdx); } else if (javaType.equals(double.class)) { @@ -253,6 +255,8 @@ private long getLong(int rowIndex, int columnIndex) return floatToIntBits(currentDataTable.getDataTable().getFloat(rowIndex, columnIndex)); case LONG: return currentDataTable.getDataTable().getLong(rowIndex, columnIndex); + case TIMESTAMP: + return PinotTimestamps.toMicros(currentDataTable.getDataTable().getLong(rowIndex, columnIndex)); default: throw new PinotException(PINOT_DECODE_ERROR, Optional.empty(), format("Unexpected pinot type: '%s'", dataType)); } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/conversion/PinotTimestamps.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/conversion/PinotTimestamps.java new file mode 100644 index 000000000000..359c1fc9d83d --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/conversion/PinotTimestamps.java @@ -0,0 +1,68 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.conversion; + +import com.google.common.primitives.Longs; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; + +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; +import static java.time.ZoneOffset.UTC; +import static java.time.format.DateTimeFormatter.ISO_INSTANT; + +public final class PinotTimestamps +{ + private static final DateTimeFormatter PINOT_TIMESTAMP_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss[.SSS][.SS][.S]"); + + private PinotTimestamps() {} + + public static long toMicros(long millis) + { + return millis * MICROSECONDS_PER_MILLISECOND; + } + + public static long toMicros(Instant instant) + { + return toMicros(instant.toEpochMilli()); + } + + public static LocalDateTime tryParse(String value) + { + Long epochMillis = Longs.tryParse(value); + if (epochMillis != null) { + return LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis), UTC); + } + // Try parsing using standard formats + LocalDateTime timestamp = tryParse(PINOT_TIMESTAMP_FORMATTER, value); + if (timestamp == null) { + timestamp = tryParse(ISO_INSTANT, value); + } + return timestamp; + } + + private static LocalDateTime tryParse(DateTimeFormatter formatter, String value) + { + try { + return formatter.parse(value, LocalDateTime::from); + } + catch (DateTimeParseException e) { + // Ignore + } + return null; + } +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java index dbc957e69f6e..452b55893b73 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java @@ -21,6 +21,7 @@ import io.trino.spi.type.FixedWidthType; import io.trino.spi.type.IntegerType; import io.trino.spi.type.RealType; +import io.trino.spi.type.TimestampType; import io.trino.spi.type.Type; import io.trino.spi.type.VarbinaryType; @@ -54,6 +55,9 @@ else if (type instanceof IntegerType) { else if (type instanceof BooleanType) { return new BooleanDecoder(); } + else if (type instanceof TimestampType) { + return new TimestampDecoder(); + } else { throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "type '" + type + "' not supported"); } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/TimestampDecoder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/TimestampDecoder.java new file mode 100644 index 000000000000..928ce820a5af --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/TimestampDecoder.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.decoders; + +import io.trino.plugin.pinot.conversion.PinotTimestamps; +import io.trino.spi.TrinoException; +import io.trino.spi.block.BlockBuilder; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.function.Supplier; + +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; +import static java.lang.String.format; +import static java.time.ZoneOffset.UTC; + +public class TimestampDecoder + implements Decoder +{ + @Override + public void decode(Supplier getter, BlockBuilder output) + { + Object value = getter.get(); + if (value == null) { + output.appendNull(); + } + else { + LocalDateTime timestamp; + if (value instanceof String) { + String valueString = (String) value; + timestamp = PinotTimestamps.tryParse(valueString); + if (timestamp == null) { + throw new TrinoException(NOT_SUPPORTED, format( + "Unable to parse string representation of type TIMESTAMP: %s [%s]", + value, + value.getClass().getSimpleName())); + } + } + else if (value instanceof Double || value instanceof Long) { + timestamp = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Number) value).longValue()), UTC); + } + else { + throw new TrinoException(NOT_SUPPORTED, format( + "Unsupported representation of type TIMESTAMP: %s [%s]", + value, + value.getClass().getSimpleName())); + } + long epochMicros = PinotTimestamps.toMicros(timestamp.atOffset(UTC).toInstant()); + TIMESTAMP_MILLIS.writeLong(output, epochMicros); + } + } +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java index 18c444f82876..a7f313a9e8e7 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java @@ -57,6 +57,7 @@ import static io.trino.spi.type.DoubleType.DOUBLE; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RealType.REAL; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static io.trino.spi.type.VarbinaryType.VARBINARY; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.String.format; @@ -146,6 +147,8 @@ private static Type toTrinoType(DataSchema.ColumnDataType columnDataType) return VARCHAR; case BYTES: return VARBINARY; + case TIMESTAMP: + return TIMESTAMP_MILLIS; case INT_ARRAY: return new ArrayType(INTEGER); case LONG_ARRAY: diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java index 3c13188fad61..4920706e9e67 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java @@ -23,6 +23,8 @@ import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.RealType; +import io.trino.spi.type.TimestampType; +import io.trino.spi.type.Timestamps; import io.trino.spi.type.Type; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; @@ -155,15 +157,26 @@ private static Object convertValue(Type type, Object value) if (type instanceof RealType) { return intBitsToFloat(toIntExact((Long) value)); } - else if (type instanceof VarcharType) { + if (type instanceof VarcharType) { return ((Slice) value).toStringUtf8(); } - else if (type instanceof VarbinaryType) { + if (type instanceof VarbinaryType) { return Hex.encodeHexString(((Slice) value).getBytes()); } + if (type instanceof TimestampType) { + return toMillis((Long) value); + } return value; } + private static Long toMillis(Long value) + { + if (value == null) { + return null; + } + return Timestamps.epochMicrosToMillisWithRounding(value); + } + private static String toConjunct(String columnName, String operator, Object value) { if (value instanceof Slice) { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotTypeResolver.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotTypeResolver.java index 2600a3d7a926..155ca070afdb 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotTypeResolver.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotTypeResolver.java @@ -25,6 +25,7 @@ import io.trino.spi.type.DoubleType; import io.trino.spi.type.IntegerType; import io.trino.spi.type.RealType; +import io.trino.spi.type.TimestampType; import io.trino.spi.type.Type; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; @@ -125,6 +126,9 @@ private static FieldSpec.DataType fromPrimitiveTrinoType(Type type) if (type instanceof VarbinaryType) { return FieldSpec.DataType.BYTES; } + if (type instanceof TimestampType) { + return FieldSpec.DataType.TIMESTAMP; + } throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported column data type: " + type); } } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/aggregation/ImplementMinMax.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/aggregation/ImplementMinMax.java index ece20466264f..25f612175ace 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/aggregation/ImplementMinMax.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/aggregation/ImplementMinMax.java @@ -39,6 +39,7 @@ import static io.trino.spi.type.DoubleType.DOUBLE; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RealType.REAL; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static java.util.Objects.requireNonNull; /** @@ -48,7 +49,7 @@ public class ImplementMinMax implements AggregateFunctionRule { private static final Capture ARGUMENT = newCapture(); - private static final Set SUPPORTED_ARGUMENT_TYPES = ImmutableSet.of(INTEGER, BIGINT, REAL, DOUBLE); + private static final Set SUPPORTED_ARGUMENT_TYPES = ImmutableSet.of(INTEGER, BIGINT, REAL, DOUBLE, TIMESTAMP_MILLIS); private final Function identifierQuote; diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java index f8bee993fff5..74b71cf4f37c 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java @@ -147,6 +147,7 @@ protected QueryRunner createQueryRunner() ImmutableList.Builder> allTypesRecordsBuilder = ImmutableList.builder(); for (int i = 0, step = 1200; i < MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES - 2; i++) { int offset = i * step; + long updatedAtMillis = initialUpdatedAt.plusMillis(offset).toEpochMilli(); allTypesRecordsBuilder.add(new ProducerRecord<>(ALL_TYPES_TABLE, "key" + i * step, createTestRecord( Arrays.asList("string_" + (offset), "string1_" + (offset + 1), "string2_" + (offset + 2)), @@ -155,7 +156,7 @@ protected QueryRunner createQueryRunner() Arrays.asList(-7.33F + i, Float.POSITIVE_INFINITY, 17.034F + i), Arrays.asList(-17.33D + i, Double.POSITIVE_INFINITY, 10596.034D + i), Arrays.asList(-3147483647L + i, 12L - i, 4147483647L + i), - initialUpdatedAt.plusMillis(offset).toEpochMilli()))); + updatedAtMillis))); } allTypesRecordsBuilder.add(new ProducerRecord<>(ALL_TYPES_TABLE, null, createNullRecord())); @@ -610,6 +611,7 @@ private static GenericRecord createTestRecord( .set("float_array_col", floatArrayColumn) .set("double_array_col", doubleArrayColumn) .set("long_array_col", longArrayColumn) + .set("timestamp_col", updatedAtMillis) .set("int_col", intArrayColumn.get(0)) .set("float_col", floatArrayColumn.get(0)) .set("double_col", doubleArrayColumn.get(0)) @@ -672,6 +674,7 @@ private static Schema getAllTypesAvroSchema() .name("float_array_col").type().optional().array().items().nullable().floatType() .name("double_array_col").type().optional().array().items().nullable().doubleType() .name("long_array_col").type().optional().array().items().nullable().longType() + .name("timestamp_col").type().optional().longType() .name("int_col").type().optional().intType() .name("float_col").type().optional().floatType() .name("double_col").type().optional().doubleType() @@ -1138,6 +1141,13 @@ public void testNullBehavior() { // Verify the null behavior of pinot: + // Default null value for timestamp single value columns is 0 + assertThat(query("SELECT timestamp_col" + + " FROM " + ALL_TYPES_TABLE + + " WHERE string_col = 'null'")) + .matches("VALUES(TIMESTAMP '1970-01-01 00:00:00.000')") + .isFullyPushedDown(); + // Default null value for long single value columns is 0 assertThat(query("SELECT long_col" + " FROM " + ALL_TYPES_TABLE + @@ -1338,6 +1348,13 @@ public void testLimitPushdown() .isNotFullyPushedDown(LimitNode.class); } + @Test + public void testPredicatePushdown() + { + assertThat(query("SELECT timestamp_col FROM " + ALL_TYPES_TABLE + " WHERE timestamp_col < TIMESTAMP '1971-01-01 00:00:00.000'")) + .isFullyPushedDown(); + } + @Test public void testCreateTable() { @@ -1366,7 +1383,8 @@ public void testAggregationPushdown() " MIN(int_col), MAX(int_col)," + " MIN(long_col), MAX(long_col), AVG(long_col), SUM(long_col)," + " MIN(float_col), MAX(float_col), AVG(float_col), SUM(float_col)," + - " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)" + + " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)," + + " MIN(timestamp_col), MAX(timestamp_col)" + " FROM " + ALL_TYPES_TABLE)) .isFullyPushedDown(); @@ -1375,7 +1393,8 @@ public void testAggregationPushdown() " MIN(int_col), MAX(int_col)," + " MIN(long_col), MAX(long_col), AVG(long_col), SUM(long_col)," + " MIN(float_col), MAX(float_col), AVG(float_col), SUM(float_col)," + - " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)" + + " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)," + + " MIN(timestamp_col), MAX(timestamp_col)" + " FROM " + ALL_TYPES_TABLE + " LIMIT " + MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES)) .isFullyPushedDown(); @@ -1385,7 +1404,8 @@ public void testAggregationPushdown() " MIN(int_col), MAX(int_col)," + " MIN(long_col), MAX(long_col), AVG(long_col), SUM(long_col)," + " MIN(float_col), MAX(float_col), AVG(float_col), SUM(float_col)," + - " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)" + + " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)," + + " MIN(timestamp_col), MAX(timestamp_col)" + " FROM " + ALL_TYPES_TABLE + " WHERE long_col < 4147483649")) .isFullyPushedDown(); @@ -1394,7 +1414,8 @@ public void testAggregationPushdown() " MIN(int_col), MAX(int_col)," + " MIN(long_col), MAX(long_col), AVG(long_col), SUM(long_col)," + " MIN(float_col), MAX(float_col), AVG(float_col), SUM(float_col)," + - " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)" + + " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)," + + " MIN(timestamp_col), MAX(timestamp_col)" + " FROM " + ALL_TYPES_TABLE + " WHERE long_col < 4147483649" + " LIMIT " + MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES)) .isFullyPushedDown(); @@ -1404,7 +1425,8 @@ public void testAggregationPushdown() " MIN(int_col), MAX(int_col)," + " MIN(long_col), MAX(long_col), AVG(long_col), SUM(long_col)," + " MIN(float_col), MAX(float_col), AVG(float_col), SUM(float_col)," + - " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)" + + " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)," + + " MIN(timestamp_col), MAX(timestamp_col)" + " FROM " + ALL_TYPES_TABLE + " GROUP BY bool_col")) .isFullyPushedDown(); @@ -1413,7 +1435,8 @@ public void testAggregationPushdown() " MIN(int_col), MAX(int_col)," + " MIN(long_col), MAX(long_col), AVG(long_col), SUM(long_col)," + " MIN(float_col), MAX(float_col), AVG(float_col), SUM(float_col)," + - " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)" + + " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)," + + " MIN(timestamp_col), MAX(timestamp_col)" + " FROM " + ALL_TYPES_TABLE + " GROUP BY string_col" + " LIMIT " + MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES)) .isFullyPushedDown(); @@ -1423,7 +1446,8 @@ public void testAggregationPushdown() " MIN(int_col), MAX(int_col)," + " MIN(long_col), MAX(long_col), AVG(long_col), SUM(long_col)," + " MIN(float_col), MAX(float_col), AVG(float_col), SUM(float_col)," + - " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)" + + " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)," + + " MIN(timestamp_col), MAX(timestamp_col)" + " FROM " + ALL_TYPES_TABLE + " WHERE long_col < 4147483649 GROUP BY bool_col")) .isFullyPushedDown(); @@ -1432,7 +1456,8 @@ public void testAggregationPushdown() " MIN(int_col), MAX(int_col)," + " MIN(long_col), MAX(long_col), AVG(long_col), SUM(long_col)," + " MIN(float_col), MAX(float_col), AVG(float_col), SUM(float_col)," + - " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)" + + " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)," + + " MIN(timestamp_col), MAX(timestamp_col)" + " FROM " + ALL_TYPES_TABLE + " WHERE long_col < 4147483649 GROUP BY string_col" + " LIMIT " + MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES)) .isFullyPushedDown(); @@ -1443,7 +1468,8 @@ public void testAggregationPushdown() " MIN(int_col), MAX(int_col)," + " MIN(long_col), MAX(long_col), AVG(long_col), SUM(long_col)," + " MIN(float_col), MAX(float_col), AVG(float_col), SUM(float_col)," + - " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)" + + " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)," + + " MIN(timestamp_col), MAX(timestamp_col)" + " FROM " + ALL_TYPES_TABLE + " WHERE long_col > 4147483649")) .isFullyPushedDown(); @@ -1458,7 +1484,8 @@ public void testAggregationPushdown() " MIN(int_col), MAX(int_col)," + " MIN(long_col), MAX(long_col), AVG(long_col), SUM(long_col)," + " MIN(float_col), MAX(float_col), AVG(float_col), SUM(float_col)," + - " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)" + + " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)," + + " MIN(timestamp_col), MAX(timestamp_col)" + " FROM \"SELECT * FROM " + ALL_TYPES_TABLE + " WHERE long_col > 4147483649" + " LIMIT " + MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES + "\" GROUP BY string_col")) .isFullyPushedDown(); @@ -1518,6 +1545,9 @@ public void testAggregationPushdown() // Distinct on long is pushed down assertThat(query("SELECT DISTINCT long_col FROM " + ALL_TYPES_TABLE)) .isFullyPushedDown(); + // Distinct on timestamp is pushed down + assertThat(query("SELECT DISTINCT timestamp_col FROM " + ALL_TYPES_TABLE)) + .isFullyPushedDown(); // Distinct on int is partially pushed down assertThat(query("SELECT DISTINCT int_col FROM " + ALL_TYPES_TABLE)) .isNotFullyPushedDown(); @@ -1531,6 +1561,8 @@ public void testAggregationPushdown() .isFullyPushedDown(); assertThat(query("SELECT DISTINCT bool_col, long_col FROM " + ALL_TYPES_TABLE)) .isFullyPushedDown(); + assertThat(query("SELECT DISTINCT bool_col, timestamp_col FROM " + ALL_TYPES_TABLE)) + .isFullyPushedDown(); assertThat(query("SELECT DISTINCT bool_col, int_col FROM " + ALL_TYPES_TABLE)) .isNotFullyPushedDown(); diff --git a/plugin/trino-pinot/src/test/resources/alltypes_realtimeSpec.json b/plugin/trino-pinot/src/test/resources/alltypes_realtimeSpec.json index 18563358b01a..851b78777de8 100644 --- a/plugin/trino-pinot/src/test/resources/alltypes_realtimeSpec.json +++ b/plugin/trino-pinot/src/test/resources/alltypes_realtimeSpec.json @@ -33,10 +33,12 @@ "MIN__float_col", "MIN__double_col", "MIN__long_col", + "MIN__timestamp_col", "MAX__int_col", "MAX__float_col", "MAX__double_col", "MAX__long_col", + "MAX__timestamp_col", "AVG__int_col", "AVG__float_col", "AVG__double_col", diff --git a/plugin/trino-pinot/src/test/resources/alltypes_schema.json b/plugin/trino-pinot/src/test/resources/alltypes_schema.json index 174e056de911..29f7d9f777d6 100644 --- a/plugin/trino-pinot/src/test/resources/alltypes_schema.json +++ b/plugin/trino-pinot/src/test/resources/alltypes_schema.json @@ -43,6 +43,10 @@ "name": "long_array_col", "dataType": "LONG", "singleValueField": false + }, + { + "name": "timestamp_col", + "dataType": "TIMESTAMP" } ], "metricFieldSpecs": [