Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,22 +160,23 @@ Data types

Pinot does not allow null values in any data type and supports the following primitive types:

========================== ============
========================== ==============
Pinot Trino
========================== ============
========================== ==============
``INT`` ``INTEGER``
``LONG`` ``BIGINT``
``FLOAT`` ``REAL``
``DOUBLE`` ``DOUBLE``
``STRING`` ``VARCHAR``
``BYTES`` ``VARBINARY``
``JSON`` ``JSON``
``TIMESTAMP`` ``TIMESTAMP``
``INT_ARRAY`` ``VARCHAR``
``LONG_ARRAY`` ``VARCHAR``
``FLOAT_ARRAY`` ``VARCHAR``
``DOUBLE_ARRAY`` ``VARCHAR``
``STRING_ARRAY`` ``VARCHAR``
========================== ============
========================== ==============

.. _pinot-sql-support:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import io.airlift.slice.Slices;
import io.trino.plugin.pinot.client.PinotDataFetcher;
import io.trino.plugin.pinot.client.PinotDataTableWithSize;
import io.trino.plugin.pinot.conversion.PinotTimestamps;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
Expand Down Expand Up @@ -167,7 +169,13 @@ private void writeBlock(BlockBuilder blockBuilder, Type columnType, int columnId
writeBooleanBlock(blockBuilder, columnType, columnIdx);
}
else if (javaType.equals(long.class)) {
writeLongBlock(blockBuilder, columnType, columnIdx);
if (columnType instanceof TimestampType) {
// Pinot TimestampType is always ShortTimestampType.
writeShortTimestampBlock(blockBuilder, columnType, columnIdx);
}
else {
writeLongBlock(blockBuilder, columnType, columnIdx);
}
}
else if (javaType.equals(double.class)) {
writeDoubleBlock(blockBuilder, columnType, columnIdx);
Expand Down Expand Up @@ -229,6 +237,15 @@ private void writeArrayBlock(BlockBuilder blockBuilder, Type columnType, int col
}
}

private void writeShortTimestampBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
{
for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
// Trino is using micros since epoch for ShortTimestampType, Pinot uses millis since epoch.
columnType.writeLong(blockBuilder, PinotTimestamps.toMicros(getLong(i, columnIndex)));
completedBytes += Long.BYTES;
}
}

private Type getType(int columnIndex)
{
checkArgument(columnIndex < columnHandles.size(), "Invalid field index");
Expand All @@ -254,6 +271,7 @@ private long getLong(int rowIndex, int columnIndex)
case FLOAT:
return floatToIntBits(currentDataTable.getDataTable().getFloat(rowIndex, columnIndex));
case LONG:
case TIMESTAMP:
return currentDataTable.getDataTable().getLong(rowIndex, columnIndex);
default:
throw new PinotException(PINOT_DECODE_ERROR, Optional.empty(), format("Unexpected pinot type: '%s'", dataType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
Expand Down Expand Up @@ -95,6 +96,8 @@ private Type toTrinoType(FieldSpec.DataType dataType)
return jsonTypeSupplier.get();
case BYTES:
return VarbinaryType.VARBINARY;
case TIMESTAMP:
return TimestampType.TIMESTAMP_MILLIS;
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.conversion;

import com.google.common.primitives.Longs;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;

import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_INSTANT;

public final class PinotTimestamps
{
private static final DateTimeFormatter PINOT_TIMESTAMP_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss[.SSS][.SS][.S]");

private PinotTimestamps() {}

public static long toMicros(long millis)
{
return millis * MICROSECONDS_PER_MILLISECOND;
}

public static long toMicros(Instant instant)
{
return toMicros(instant.toEpochMilli());
}

public static LocalDateTime tryParse(String value)
{
Long epochMillis = Longs.tryParse(value);
if (epochMillis != null) {
return LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis), UTC);
}
// Try parsing using standard formats
LocalDateTime timestamp = tryParse(PINOT_TIMESTAMP_FORMATTER, value);
if (timestamp == null) {
timestamp = tryParse(ISO_INSTANT, value);
}
return timestamp;
}

private static LocalDateTime tryParse(DateTimeFormatter formatter, String value)
{
try {
return formatter.parse(value, LocalDateTime::from);
}
catch (DateTimeParseException e) {
// Ignore
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.spi.type.FixedWidthType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;

Expand Down Expand Up @@ -55,6 +56,9 @@ public static Decoder createDecoder(Type type)
if (type instanceof BooleanType) {
return new BooleanDecoder();
}
if (type instanceof TimestampType) {
return new TimestampDecoder();
}
throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "type '" + type + "' not supported");
}
if (type instanceof ArrayType) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.decoders;

import io.trino.plugin.pinot.conversion.PinotTimestamps;
import io.trino.spi.TrinoException;
import io.trino.spi.block.BlockBuilder;

import java.time.Instant;
import java.time.LocalDateTime;
import java.util.function.Supplier;

import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;

public class TimestampDecoder
implements Decoder
{
@Override
public void decode(Supplier<Object> getter, BlockBuilder output)
{
Object value = getter.get();
if (value == null) {
output.appendNull();
}
else {
LocalDateTime timestamp;
if (value instanceof String stringValue) {
timestamp = PinotTimestamps.tryParse(stringValue);
if (timestamp == null) {
throw new TrinoException(NOT_SUPPORTED, format(
"Unable to parse string representation of type TIMESTAMP: %s [%s]",
value,
value.getClass().getSimpleName()));
}
}
else if (value instanceof Double || value instanceof Long) {
timestamp = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Number) value).longValue()), UTC);
}
else {
throw new TrinoException(NOT_SUPPORTED, format(
"Unsupported representation of type TIMESTAMP: %s [%s]",
value,
value.getClass().getSimpleName()));
}
long epochMicros = PinotTimestamps.toMicros(timestamp.atOffset(UTC).toInstant());
TIMESTAMP_MILLIS.writeLong(output, epochMicros);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.RealType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
Expand Down Expand Up @@ -167,9 +169,20 @@ private static Object convertValue(Type type, Object value)
if (type instanceof VarbinaryType) {
return Hex.encodeHexString(((Slice) value).getBytes());
}
if (type instanceof TimestampType) {
return toMillis((Long) value);
}
return value;
}

private static Long toMillis(Long value)
{
if (value == null) {
return null;
}
return Timestamps.epochMicrosToMillisWithRounding(value);
Comment thread
xiangfu0 marked this conversation as resolved.
Outdated
}

private static String toConjunct(String columnName, String operator, Object value)
{
if (value instanceof Slice) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -48,7 +49,7 @@ public class ImplementMinMax
implements AggregateFunctionRule<AggregateExpression, Void>
{
private static final Capture<Variable> ARGUMENT = newCapture();
private static final Set<Type> SUPPORTED_ARGUMENT_TYPES = ImmutableSet.of(INTEGER, BIGINT, REAL, DOUBLE);
private static final Set<Type> SUPPORTED_ARGUMENT_TYPES = ImmutableSet.of(INTEGER, BIGINT, REAL, DOUBLE, TIMESTAMP_MILLIS);

private final Function<String, String> identifierQuote;

Expand Down
Loading