Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,22 @@ Data types

Pinot does not allow null values in any data type and supports the following primitive types:

========================== ============
========================== ==============
Pinot Trino
========================== ============
========================== ==============
``INT`` ``INTEGER``
``LONG`` ``BIGINT``
``FLOAT`` ``REAL``
``DOUBLE`` ``DOUBLE``
``STRING`` ``VARCHAR``
``BYTES`` ``VARBINARY``
``TIMESTAMP`` ``TIMESTAMP``
``INT_ARRAY`` ``VARCHAR``
``LONG_ARRAY`` ``VARCHAR``
``FLOAT_ARRAY`` ``VARCHAR``
``DOUBLE_ARRAY`` ``VARCHAR``
``STRING_ARRAY`` ``VARCHAR``
========================== ============
========================== ==============

.. _pinot-sql-support:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
Expand Down Expand Up @@ -131,6 +132,8 @@ public static Type getTrinoTypeFromPinotType(FieldSpec.DataType dataType)
return VarcharType.VARCHAR;
case BYTES:
return VarbinaryType.VARBINARY;
case TIMESTAMP:
return TimestampType.TIMESTAMP_MILLIS;
Comment thread
hashhar marked this conversation as resolved.
Outdated
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airlift.slice.Slices;
import io.trino.plugin.pinot.client.PinotDataFetcher;
import io.trino.plugin.pinot.client.PinotDataTableWithSize;
import io.trino.plugin.pinot.conversion.PinotTimestamps;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -165,6 +166,7 @@ private void writeBlock(BlockBuilder blockBuilder, Type columnType, int columnId
writeBooleanBlock(blockBuilder, columnType, columnIdx);
}
else if (javaType.equals(long.class)) {
// Applies to timestamp as well since precision is milliseconds
writeLongBlock(blockBuilder, columnType, columnIdx);
}
else if (javaType.equals(double.class)) {
Expand Down Expand Up @@ -253,6 +255,8 @@ private long getLong(int rowIndex, int columnIndex)
return floatToIntBits(currentDataTable.getDataTable().getFloat(rowIndex, columnIndex));
case LONG:
return currentDataTable.getDataTable().getLong(rowIndex, columnIndex);
case TIMESTAMP:
return PinotTimestamps.toMicros(currentDataTable.getDataTable().getLong(rowIndex, columnIndex));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Pinot only support timestamps with precision <= 6? If not we should probably add a verify here to make sure we don't silently do incorrect things with higher precision values.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default:
throw new PinotException(PINOT_DECODE_ERROR, Optional.empty(), format("Unexpected pinot type: '%s'", dataType));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.conversion;

import com.google.common.primitives.Longs;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;

import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_INSTANT;

public final class PinotTimestamps
{
private static final DateTimeFormatter PINOT_TIMESTAMP_FORMATTER =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be:

    private static final DateTimeFormatter PINOT_TIMESTAMP_FORMATTER =
            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

Can you also add a test for Pinot timestamp Strings e.g.:

PINOT_TIMESTAMP_FORMATTER.parse("2022-06-27 08:10:34.647", LocalDateTime::from);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be:
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss[.SSS][.SS][.S]")

DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss[.SSS][.SS][.S]");

private PinotTimestamps() {}

public static long toMicros(long millis)
{
return millis * MICROSECONDS_PER_MILLISECOND;
}

public static long toMicros(Instant instant)
{
return toMicros(instant.toEpochMilli());
}

public static LocalDateTime tryParse(String value)
{
Long epochMillis = Longs.tryParse(value);
if (epochMillis != null) {
return LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis), UTC);
}
// Try parsing using standard formats
LocalDateTime timestamp = tryParse(PINOT_TIMESTAMP_FORMATTER, value);
if (timestamp == null) {
timestamp = tryParse(ISO_INSTANT, value);
}
return timestamp;
Comment thread
hashhar marked this conversation as resolved.
Outdated
}

private static LocalDateTime tryParse(DateTimeFormatter formatter, String value)
{
try {
return formatter.parse(value, LocalDateTime::from);
}
catch (DateTimeParseException e) {
// Ignore
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.spi.type.FixedWidthType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;

Expand Down Expand Up @@ -54,6 +55,9 @@ else if (type instanceof IntegerType) {
else if (type instanceof BooleanType) {
return new BooleanDecoder();
}
else if (type instanceof TimestampType) {
return new TimestampDecoder();
}
else {
throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "type '" + type + "' not supported");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.decoders;

import io.trino.plugin.pinot.conversion.PinotTimestamps;
import io.trino.spi.TrinoException;
import io.trino.spi.block.BlockBuilder;

import java.time.Instant;
import java.time.LocalDateTime;
import java.util.function.Supplier;

import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;

public class TimestampDecoder
implements Decoder
{
@Override
public void decode(Supplier<Object> getter, BlockBuilder output)
{
Object value = getter.get();
if (value == null) {
output.appendNull();
}
else {
LocalDateTime timestamp;
if (value instanceof String) {
String valueString = (String) value;
timestamp = PinotTimestamps.tryParse(valueString);
if (timestamp == null) {
throw new TrinoException(NOT_SUPPORTED, format(
"Unable to parse string representation of type TIMESTAMP: %s [%s]",
value,
value.getClass().getSimpleName()));
}
}
else if (value instanceof Double || value instanceof Long) {
timestamp = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Number) value).longValue()), UTC);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid casts from Number. Byte, BigDecimal etc. are all subclasses of Number.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed this to Long

Copy link
Copy Markdown
Member Author

@ddcprg ddcprg May 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hashhar seems like the test fails because the aggregate function returns double instead of long, I need to dig further if we don't want to cast from Number

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hashhar @xiangfu0 Pinot's MAX function returns double https://docs.pinot.apache.org/users/user-guide-query/supported-aggregations I think the options are either keeping Number or having 2 conditionals: one for Double and one for Long. I rather keep checking for Number type, let me know your thoughts

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think number is ok.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having else if (value instanceof Double || value instanceof Long) is still useful in my opinion.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now made this change

}
else {
throw new TrinoException(NOT_SUPPORTED, format(
"Unsupported representation of type TIMESTAMP: %s [%s]",
value,
value.getClass().getSimpleName()));
}
long epochMicros = PinotTimestamps.toMicros(timestamp.atOffset(UTC).toInstant());
TIMESTAMP_MILLIS.writeLong(output, epochMicros);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why write microseconds to this TIMESTAMP_MILLIS?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nizarhejazi @ddcprg can you check this?
Also make the CI passing?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiangfu0 I'm absent at the moment. I'll retake this in 2 weeks time

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder

Copy link
Copy Markdown
Member Author

@ddcprg ddcprg Jul 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a temporary conversion to micros to be able to invoke TIMESTAMP_MILLIS.writeLong(output, epochMicros); if there is a better way to write milliseconds to the output block please let me know. Please read the javadocs in ShortTimestampType

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
Expand Down Expand Up @@ -146,6 +147,8 @@ private static Type toTrinoType(DataSchema.ColumnDataType columnDataType)
return VARCHAR;
case BYTES:
return VARBINARY;
case TIMESTAMP:
return TIMESTAMP_MILLIS;
case INT_ARRAY:
return new ArrayType(INTEGER);
case LONG_ARRAY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.RealType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
Expand Down Expand Up @@ -155,15 +157,26 @@ private static Object convertValue(Type type, Object value)
if (type instanceof RealType) {
return intBitsToFloat(toIntExact((Long) value));
}
else if (type instanceof VarcharType) {
if (type instanceof VarcharType) {
return ((Slice) value).toStringUtf8();
}
else if (type instanceof VarbinaryType) {
if (type instanceof VarbinaryType) {
return Hex.encodeHexString(((Slice) value).getBytes());
}
if (type instanceof TimestampType) {
return toMillis((Long) value);
}
return value;
}

private static Long toMillis(Long value)
{
if (value == null) {
return null;
}
return Timestamps.epochMicrosToMillisWithRounding(value);
}

private static String toConjunct(String columnName, String operator, Object value)
{
if (value instanceof Slice) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
Expand Down Expand Up @@ -125,6 +126,9 @@ private static FieldSpec.DataType fromPrimitiveTrinoType(Type type)
if (type instanceof VarbinaryType) {
return FieldSpec.DataType.BYTES;
}
if (type instanceof TimestampType) {
return FieldSpec.DataType.TIMESTAMP;
}
throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported column data type: " + type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -48,7 +49,7 @@ public class ImplementMinMax
implements AggregateFunctionRule<AggregateExpression, Void>
{
private static final Capture<Variable> ARGUMENT = newCapture();
private static final Set<Type> SUPPORTED_ARGUMENT_TYPES = ImmutableSet.of(INTEGER, BIGINT, REAL, DOUBLE);
private static final Set<Type> SUPPORTED_ARGUMENT_TYPES = ImmutableSet.of(INTEGER, BIGINT, REAL, DOUBLE, TIMESTAMP_MILLIS);

private final Function<String, String> identifierQuote;

Expand Down
Loading