Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -356,22 +356,22 @@ public LocalDateTime read(LocalDateTime reuse) {
}
}

private static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

private TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
public OffsetDateTime read(OffsetDateTime reuse) {
final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return Instant
.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN))
.plusNanos(timeOfDayNanos).atOffset(ZoneOffset.UTC).toLocalDateTime();
.plusNanos(timeOfDayNanos).atOffset(ZoneOffset.UTC);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public Type primitive(PrimitiveType primitive) {
return Types.DoubleType.get();
case FIXED_LEN_BYTE_ARRAY:
return Types.FixedType.ofLength(primitive.getTypeLength());
case INT96:
return Types.TimestampType.withZone();
case BINARY:
return Types.BinaryType.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,23 @@
import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.ParquetWriteAdapter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.TypeUtil;
Expand Down Expand Up @@ -95,6 +104,29 @@ protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) thr
}
}

protected Table tableFromInputFile(InputFile inputFile, Schema schema) throws IOException {
HadoopTables tables = new HadoopTables();
Table table =
tables.create(
schema,
PartitionSpec.unpartitioned(),
ImmutableMap.of(),
temp.newFolder().getCanonicalPath());

table
.newAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withFormat(FileFormat.PARQUET)
.withInputFile(inputFile)
.withMetrics(ParquetUtil.fileMetrics(inputFile, MetricsConfig.getDefault()))
.withFileSizeInBytes(inputFile.getLength())
.build())
.commit();

return table;
}

@Test
public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
String outputFilePath = String.format("%s/%s", temp.getRoot().getAbsolutePath(), "parquet_int96.parquet");
Expand All @@ -120,9 +152,21 @@ public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOExceptio
writer.addAll(rows);
}

List<InternalRow> readRows = rowsFromFile(Files.localInput(outputFilePath), schema);
InputFile parquetInputFile = Files.localInput(outputFilePath);
List<InternalRow> readRows = rowsFromFile(parquetInputFile, schema);
Assert.assertEquals(rows.size(), readRows.size());
Assert.assertThat(readRows, CoreMatchers.is(rows));

// Now we try to import that file as an Iceberg table to make sure Iceberg can read
// Int96 end to end.
Table int96Table = tableFromInputFile(parquetInputFile, schema);
List<Record> tableRecords = Lists.newArrayList(IcebergGenerics.read(int96Table).build());

Assert.assertEquals(rows.size(), tableRecords.size());

for (int i = 0; i < tableRecords.size(); i++) {
GenericsHelpers.assertEqualsUnsafe(schema.asStruct(), tableRecords.get(i), rows.get(i));
}
}

/**
Expand Down