Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;

import static com.google.common.base.Verify.verify;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_DAY;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_SECOND;
Expand Down Expand Up @@ -63,9 +61,8 @@ public static DecodedTimestamp decodeInt96Timestamp(Binary timestampBinary)

public static DecodedTimestamp decodeInt96Timestamp(long timeOfDayNanos, int julianDay)
{
verify(timeOfDayNanos >= 0 && timeOfDayNanos < NANOSECONDS_PER_DAY, "Invalid timeOfDayNanos: %s", timeOfDayNanos);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this been considered in #19169?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That PR was removing validation from io.trino.plugin.base.type.DecodedTimestamp#DecodedTimestamp constructor too which we want to avoid. I don't think that PR attempted anything another than just removing validation.

long epochSeconds = (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * SECONDS_PER_DAY + timeOfDayNanos / NANOSECONDS_PER_SECOND;
return new DecodedTimestamp(epochSeconds, (int) (timeOfDayNanos % NANOSECONDS_PER_SECOND));
long epochSeconds = (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * SECONDS_PER_DAY + floorDiv(timeOfDayNanos, NANOSECONDS_PER_SECOND);
return new DecodedTimestamp(epochSeconds, (int) floorMod(timeOfDayNanos, NANOSECONDS_PER_SECOND));
}

public static DecodedTimestamp decodeInt64Timestamp(long timestamp, LogicalTypeAnnotation.TimeUnit precision)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,45 @@
*/
package io.trino.parquet.reader;

import com.google.common.collect.ImmutableList;
import com.google.common.io.Resources;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.parquet.DataPage;
import io.trino.parquet.DataPageV2;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.PrimitiveField;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.plugin.base.type.DecodedTimestamp;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.Fixed12Block;
import io.trino.spi.type.SqlTimestamp;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.Type;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
import org.apache.parquet.schema.Types;
import org.joda.time.DateTimeZone;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.BiFunction;

import static io.airlift.slice.Slices.EMPTY_SLICE;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.parquet.ParquetEncoding.PLAIN;
import static io.trino.parquet.ParquetTestUtils.createParquetReader;
import static io.trino.parquet.reader.TestingColumnReader.encodeInt96Timestamp;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
Expand Down Expand Up @@ -80,8 +88,59 @@ public class TestInt96Timestamp
LocalDateTime.of(2022, 2, 3, 12, 8, 51, 999999999),
LocalDateTime.of(123456, 1, 2, 3, 4, 5, 678901234)};

@Test(dataProvider = "testVariousTimestampsDataProvider")
public void testVariousTimestamps(TimestampType type, BiFunction<Block, Integer, DecodedTimestamp> actualValuesProvider)
@Test
public void testVariousTimestamps()
throws IOException
{
testVariousTimestamps(TIMESTAMP_MILLIS);
testVariousTimestamps(TIMESTAMP_MICROS);
testVariousTimestamps(TIMESTAMP_NANOS);
testVariousTimestamps(TIMESTAMP_PICOS);
}

@Test
public void testNanosOutsideDayRange()
throws IOException, URISyntaxException
{
List<String> columnNames = ImmutableList.of("timestamp");
List<Type> types = ImmutableList.of(TIMESTAMP_NANOS);

// int96_timestamps_nanos_outside_day_range.parquet file is prepared with timeOfDayNanos values which are
// outside the [0, NANOSECONDS_PER_DAY] range to simulate data generated by AWS wrangler.
// https://github.com/aws/aws-sdk-pandas/issues/592#issuecomment-920716270
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that comment suggests it might be a bug in pyarrow.
Will it be fixed there?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's tackled by providing a flag to make awswrangler stop writing int96 timestamps
aws/aws-sdk-pandas#592 (comment)

wr.s3.to_parquet(..., pyarrow_additional_kwargs={"use_deprecated_int96_timestamps"=False})

// ALl other known parquet writers don't violate the [0, NANOSECONDS_PER_DAY] range for timeOfDayNanos
ParquetDataSource dataSource = new FileParquetDataSource(
new File(Resources.getResource("int96_timestamps_nanos_outside_day_range.parquet").toURI()),
new ParquetReaderOptions());
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames);

Page page = reader.nextPage();
ImmutableList.Builder<LocalDateTime> builder = ImmutableList.builder();
while (page != null) {
Fixed12Block block = (Fixed12Block) page.getBlock(0).getLoadedBlock();
for (int i = 0; i < block.getPositionCount(); i++) {
builder.add(toLocalDateTime(block, i));
}
page = reader.nextPage();
}
assertThat(builder.build()).containsExactlyInAnyOrder(
LocalDateTime.of(-5001, 12, 31, 4, 22, 57, 193656253),
LocalDateTime.of(1970, 1, 1, 13, 43, 58, 721344111),
LocalDateTime.of(1969, 12, 30, 22, 14, 51, 243235321),
LocalDateTime.of(-1, 4, 2, 22, 35, 10, 668330477),
LocalDateTime.of(0, 12, 30, 0, 7, 53, 939664765),
LocalDateTime.of(1410, 7, 15, 2, 18, 26, 329074140),
LocalDateTime.of(1920, 8, 17, 12, 56, 2, 190285077),
LocalDateTime.of(1969, 12, 31, 12, 11, 30, 879147442),
LocalDateTime.of(1969, 12, 30, 6, 26, 40, 679553451),
LocalDateTime.of(1970, 1, 2, 3, 38, 38, 483312394),
LocalDateTime.of(1970, 1, 2, 12, 0, 27, 672539248),
LocalDateTime.of(2022, 2, 4, 7, 55, 30, 455814445),
LocalDateTime.of(123456, 1, 2, 0, 56, 30, 494898191));
}

private void testVariousTimestamps(TimestampType type)
throws IOException
{
int valueCount = TIMESTAMPS.length;
Expand Down Expand Up @@ -127,31 +186,30 @@ public void testVariousTimestamps(TimestampType type, BiFunction<Block, Integer,
expectedNanos = 0;
}

DecodedTimestamp actual = actualValuesProvider.apply(block, i);
DecodedTimestamp actual = toDecodedTimestamp(type, block, i);
assertThat(actual.epochSeconds()).isEqualTo(expectedEpochSeconds);
assertThat(actual.nanosOfSecond()).isEqualTo(expectedNanos);
}
}

@DataProvider(name = "testVariousTimestampsDataProvider")
public Object[][] testVariousTimestampsDataProvider()
private static DecodedTimestamp toDecodedTimestamp(TimestampType timestampType, Block block, int position)
{
BiFunction<Block, Integer, DecodedTimestamp> shortTimestamp = (block, i) -> {
long value = TIMESTAMP_MICROS.getLong(block, i);
if (timestampType.isShort()) {
long value = TIMESTAMP_MICROS.getLong(block, position);
return new DecodedTimestamp(floorDiv(value, MICROSECONDS_PER_SECOND), floorMod(value, MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND);
};
BiFunction<Block, Integer, DecodedTimestamp> longTimestamp = (block, i) ->
{
Fixed12Block fixed12Block = (Fixed12Block) block;
return new DecodedTimestamp(
floorDiv(fixed12Block.getFixed12First(i), MICROSECONDS_PER_SECOND),
floorMod(fixed12Block.getFixed12First(i), MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND + fixed12Block.getFixed12Second(i) / PICOSECONDS_PER_NANOSECOND);
};

return new Object[][] {
new Object[] {TIMESTAMP_MILLIS, shortTimestamp},
new Object[] {TIMESTAMP_MICROS, shortTimestamp},
new Object[] {TIMESTAMP_NANOS, longTimestamp},
new Object[] {TIMESTAMP_PICOS, longTimestamp}};
}
Fixed12Block fixed12Block = (Fixed12Block) block;
return new DecodedTimestamp(
floorDiv(fixed12Block.getFixed12First(position), MICROSECONDS_PER_SECOND),
floorMod(fixed12Block.getFixed12First(position), MICROSECONDS_PER_SECOND) * NANOSECONDS_PER_MICROSECOND + fixed12Block.getFixed12Second(position) / PICOSECONDS_PER_NANOSECOND);
}

private static LocalDateTime toLocalDateTime(Fixed12Block block, int position)
{
long epochMicros = block.getFixed12First(position);
int picosOfMicro = block.getFixed12Second(position);

return SqlTimestamp.newInstance(9, epochMicros, picosOfMicro)
.toLocalDateTime();
}
}
Binary file not shown.