Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
Expand Down Expand Up @@ -264,6 +266,10 @@ public ParquetValueReader<?> primitive(org.apache.iceberg.types.Type.PrimitiveTy
case INT64:
case DOUBLE:
return new ParquetValueReaders.UnboxedReader<>(desc);
case INT96:
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
// compatibility we try to read INT96 as timestamps.
return new TimestampInt96Reader(desc);
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
Expand Down Expand Up @@ -321,6 +327,29 @@ public DecimalData read(DecimalData ignored) {
}
}

private static class TimestampInt96Reader extends ParquetValueReaders.UnboxedReader<Long> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Long read(Long ignored) {
return readLong();
}

@Override
public long readLong() {
final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) +
TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
}
}

private static class MicrosToTimestampTzReader extends ParquetValueReaders.UnboxedReader<TimestampData> {
MicrosToTimestampTzReader(ColumnDescriptor desc) {
super(desc);
Expand Down