diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTimestampUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTimestampUtils.java index a5c728aa47fd..117d1e0adb13 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTimestampUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTimestampUtils.java @@ -20,11 +20,9 @@ import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.LogicalTypeAnnotation; -import static com.google.common.base.Verify.verify; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND; import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND; -import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_DAY; import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND; import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MILLISECOND; import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_SECOND; @@ -63,9 +61,8 @@ public static DecodedTimestamp decodeInt96Timestamp(Binary timestampBinary) public static DecodedTimestamp decodeInt96Timestamp(long timeOfDayNanos, int julianDay) { - verify(timeOfDayNanos >= 0 && timeOfDayNanos < NANOSECONDS_PER_DAY, "Invalid timeOfDayNanos: %s", timeOfDayNanos); - long epochSeconds = (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * SECONDS_PER_DAY + timeOfDayNanos / NANOSECONDS_PER_SECOND; - return new DecodedTimestamp(epochSeconds, (int) (timeOfDayNanos % NANOSECONDS_PER_SECOND)); + long epochSeconds = (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * SECONDS_PER_DAY + floorDiv(timeOfDayNanos, NANOSECONDS_PER_SECOND); + return new DecodedTimestamp(epochSeconds, (int) floorMod(timeOfDayNanos, NANOSECONDS_PER_SECOND)); } public static DecodedTimestamp decodeInt64Timestamp(long timestamp, LogicalTypeAnnotation.TimeUnit precision) diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java index 8e0110c31dc2..aabb734e5b0c 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java @@ -13,37 +13,45 @@ */ package io.trino.parquet.reader; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Resources; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.trino.parquet.DataPage; import io.trino.parquet.DataPageV2; +import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.PrimitiveField; +import io.trino.parquet.metadata.ParquetMetadata; import io.trino.plugin.base.type.DecodedTimestamp; +import io.trino.spi.Page; import io.trino.spi.block.Block; import io.trino.spi.block.Fixed12Block; +import io.trino.spi.type.SqlTimestamp; import io.trino.spi.type.TimestampType; import io.trino.spi.type.Timestamps; +import io.trino.spi.type.Type; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; import org.apache.parquet.schema.Types; import org.joda.time.DateTimeZone; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; +import org.junit.jupiter.api.Test; +import java.io.File; import java.io.IOException; +import java.net.URISyntaxException; import java.time.LocalDateTime; import java.util.List; import java.util.Optional; import java.util.OptionalLong; -import java.util.function.BiFunction; import static io.airlift.slice.Slices.EMPTY_SLICE; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.parquet.ParquetEncoding.PLAIN; +import static io.trino.parquet.ParquetTestUtils.createParquetReader; import static io.trino.parquet.reader.TestingColumnReader.encodeInt96Timestamp; import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; @@ -80,8 +88,59 @@ public class TestInt96Timestamp LocalDateTime.of(2022, 2, 3, 12, 8, 51, 999999999), LocalDateTime.of(123456, 1, 2, 3, 4, 5, 678901234)}; - @Test(dataProvider = "testVariousTimestampsDataProvider") - public void testVariousTimestamps(TimestampType type, BiFunction actualValuesProvider) + @Test + public void testVariousTimestamps() + throws IOException + { + testVariousTimestamps(TIMESTAMP_MILLIS); + testVariousTimestamps(TIMESTAMP_MICROS); + testVariousTimestamps(TIMESTAMP_NANOS); + testVariousTimestamps(TIMESTAMP_PICOS); + } + + @Test + public void testNanosOutsideDayRange() + throws IOException, URISyntaxException + { + List columnNames = ImmutableList.of("timestamp"); + List types = ImmutableList.of(TIMESTAMP_NANOS); + + // int96_timestamps_nanos_outside_day_range.parquet file is prepared with timeOfDayNanos values which are + // outside the [0, NANOSECONDS_PER_DAY] range to simulate data generated by AWS wrangler. + // https://github.com/aws/aws-sdk-pandas/issues/592#issuecomment-920716270 + // ALl other known parquet writers don't violate the [0, NANOSECONDS_PER_DAY] range for timeOfDayNanos + ParquetDataSource dataSource = new FileParquetDataSource( + new File(Resources.getResource("int96_timestamps_nanos_outside_day_range.parquet").toURI()), + new ParquetReaderOptions()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames); + + Page page = reader.nextPage(); + ImmutableList.Builder builder = ImmutableList.builder(); + while (page != null) { + Fixed12Block block = (Fixed12Block) page.getBlock(0).getLoadedBlock(); + for (int i = 0; i < block.getPositionCount(); i++) { + builder.add(toLocalDateTime(block, i)); + } + page = reader.nextPage(); + } + assertThat(builder.build()).containsExactlyInAnyOrder( + LocalDateTime.of(-5001, 12, 31, 4, 22, 57, 193656253), + LocalDateTime.of(1970, 1, 1, 13, 43, 58, 721344111), + LocalDateTime.of(1969, 12, 30, 22, 14, 51, 243235321), + LocalDateTime.of(-1, 4, 2, 22, 35, 10, 668330477), + LocalDateTime.of(0, 12, 30, 0, 7, 53, 939664765), + LocalDateTime.of(1410, 7, 15, 2, 18, 26, 329074140), + LocalDateTime.of(1920, 8, 17, 12, 56, 2, 190285077), + LocalDateTime.of(1969, 12, 31, 12, 11, 30, 879147442), + LocalDateTime.of(1969, 12, 30, 6, 26, 40, 679553451), + LocalDateTime.of(1970, 1, 2, 3, 38, 38, 483312394), + LocalDateTime.of(1970, 1, 2, 12, 0, 27, 672539248), + LocalDateTime.of(2022, 2, 4, 7, 55, 30, 455814445), + LocalDateTime.of(123456, 1, 2, 0, 56, 30, 494898191)); + } + + private void testVariousTimestamps(TimestampType type) throws IOException { int valueCount = TIMESTAMPS.length; @@ -127,31 +186,30 @@ public void testVariousTimestamps(TimestampType type, BiFunction shortTimestamp = (block, i) -> { - long value = TIMESTAMP_MICROS.getLong(block, i); + if (timestampType.isShort()) { + long value = TIMESTAMP_MICROS.getLong(block, position); return new DecodedTimestamp(floorDiv(value, MICROSECONDS_PER_SECOND), floorMod(value, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND); - }; - BiFunction longTimestamp = (block, i) -> - { - Fixed12Block fixed12Block = (Fixed12Block) block; - return new DecodedTimestamp( - floorDiv(fixed12Block.getFixed12First(i), MICROSECONDS_PER_SECOND), - floorMod(fixed12Block.getFixed12First(i), MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND + fixed12Block.getFixed12Second(i) / PICOSECONDS_PER_NANOSECOND); - }; - - return new Object[][] { - new Object[] {TIMESTAMP_MILLIS, shortTimestamp}, - new Object[] {TIMESTAMP_MICROS, shortTimestamp}, - new Object[] {TIMESTAMP_NANOS, longTimestamp}, - new Object[] {TIMESTAMP_PICOS, longTimestamp}}; + } + Fixed12Block fixed12Block = (Fixed12Block) block; + return new DecodedTimestamp( + floorDiv(fixed12Block.getFixed12First(position), MICROSECONDS_PER_SECOND), + floorMod(fixed12Block.getFixed12First(position), MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND + fixed12Block.getFixed12Second(position) / PICOSECONDS_PER_NANOSECOND); + } + + private static LocalDateTime toLocalDateTime(Fixed12Block block, int position) + { + long epochMicros = block.getFixed12First(position); + int picosOfMicro = block.getFixed12Second(position); + + return SqlTimestamp.newInstance(9, epochMicros, picosOfMicro) + .toLocalDateTime(); } } diff --git a/lib/trino-parquet/src/test/resources/int96_timestamps_nanos_outside_day_range.parquet b/lib/trino-parquet/src/test/resources/int96_timestamps_nanos_outside_day_range.parquet new file mode 100644 index 000000000000..9ddd5c6a8bc6 Binary files /dev/null and b/lib/trino-parquet/src/test/resources/int96_timestamps_nanos_outside_day_range.parquet differ