From 4cbc00709f08384bb6698028dbcfdfba0fe666f7 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Wed, 1 Mar 2023 07:57:48 +0800 Subject: [PATCH 01/12] Support vectorized read for timestampInt96 --- .../GenericArrowVectorAccessorFactory.java | 27 +++++++ .../vectorized/VectorizedArrowReader.java | 14 ++++ .../vectorized/parquet/TimestampUtil.java | 36 ++++++++++ .../parquet/VectorizedColumnIterator.java | 18 +++++ ...dDictionaryEncodedParquetValuesReader.java | 16 +++++ .../parquet/VectorizedPageIterator.java | 30 ++++++++ ...ectorizedParquetDefinitionLevelReader.java | 41 +++++++++++ .../source/TestIcebergSourceHadoopTables.java | 5 ++ .../source/TestIcebergSourceHiveTables.java | 18 +++-- .../source/TestIcebergSourceTablesBase.java | 72 +++++++++++++++++++ 10 files changed, 272 insertions(+), 5 deletions(-) create mode 100644 arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/TimestampUtil.java diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java index 83f3f62e7018..56313144df70 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java @@ -21,6 +21,7 @@ import java.lang.reflect.Array; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.function.IntFunction; import java.util.function.Supplier; import java.util.stream.IntStream; @@ -42,6 +43,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.util.DecimalUtility; +import org.apache.iceberg.arrow.vectorized.parquet.TimestampUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; @@ -156,6 +158,8 @@ public ArrowVectorAccessor getVecto return new DictionaryFloatAccessor<>((IntVector) vector, dictionary); case INT64: return new DictionaryLongAccessor<>((IntVector) vector, dictionary); + case INT96: + return new DictionaryTimestampInt96Accessor<>((IntVector) vector, dictionary); case DOUBLE: return new DictionaryDoubleAccessor<>((IntVector) vector, dictionary); default: @@ -438,6 +442,29 @@ public final byte[] getBinary(int rowId) { } } + private static class DictionaryTimestampInt96Accessor< + DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable> + extends ArrowVectorAccessor { + private final IntVector offsetVector; + private final Dictionary dictionary; + + DictionaryTimestampInt96Accessor(IntVector vector, Dictionary dictionary) { + super(vector); + this.offsetVector = vector; + this.dictionary = dictionary; + } + + @Override + public final long getLong(int rowId) { + ByteBuffer byteBuffer = + dictionary + .decodeToBinary(offsetVector.get(rowId)) + .toByteBuffer() + .order(ByteOrder.LITTLE_ENDIAN); + return TimestampUtil.extractTimestampInt96(byteBuffer); + } + } + private static class DateAccessor< DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable> extends ArrowVectorAccessor { diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 79cbfb34bd54..2d1bc16208f7 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -112,6 +112,7 @@ private enum ReadType { FLOAT, DOUBLE, TIMESTAMP_MILLIS, + TIMESTAMP_INT96, TIME_MICROS, UUID, DICTIONARY @@ -166,6 +167,11 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { .fixedWidthTypeBinaryBatchReader() .nextBatch(vec, typeWidth, nullabilityHolder); break; + case TIMESTAMP_INT96: + vectorizedColumnIterator + .timestampInt96BatchReader() + .nextBatch(vec, typeWidth, nullabilityHolder); + break; case BOOLEAN: vectorizedColumnIterator.booleanBatchReader().nextBatch(vec, -1, nullabilityHolder); break; @@ -334,6 +340,14 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF vec.allocateNew(); this.typeWidth = len; break; + case INT96: + int length = BigIntVector.TYPE_WIDTH; + this.readType = ReadType.TIMESTAMP_INT96; + this.vec = arrowField.createVector(rootAlloc); + vec.setInitialCapacity(batchSize * length); + vec.allocateNew(); + this.typeWidth = length; + break; case BINARY: this.vec = arrowField.createVector(rootAlloc); // TODO: Possibly use the uncompressed page size info to set the initial capacity diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/TimestampUtil.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/TimestampUtil.java new file mode 100644 index 000000000000..88e419014228 --- /dev/null +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/TimestampUtil.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized.parquet; + +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +public class TimestampUtil { + + private TimestampUtil() {} + + private static final long UNIX_EPOCH_JULIAN = 2_440_588L; + + public static long extractTimestampInt96(ByteBuffer buffer) { + long timeOfDayNanos = buffer.getLong(); + int julianDay = buffer.getInt(); + return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) + + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + } +} diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java index 094e306d5bf1..081a5b6fd7da 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java @@ -144,6 +144,20 @@ protected int nextBatchOf( } } + public class TimestampInt96BatchReader extends BatchReader { + @Override + protected int nextBatchOf( + final FieldVector vector, + final int expectedBatchSize, + final int numValsInVector, + final int typeWidth, + NullabilityHolder holder) { + return vectorizedPageIterator + .timestampInt96PageReader() + .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); + } + } + public class FloatBatchReader extends BatchReader { @Override protected int nextBatchOf( @@ -286,6 +300,10 @@ public TimestampMillisBatchReader timestampMillisBatchReader() { return new TimestampMillisBatchReader(); } + public TimestampInt96BatchReader timestampInt96BatchReader() { + return new TimestampInt96BatchReader(); + } + public FloatBatchReader floatBatchReader() { return new FloatBatchReader(); } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java index 96071fe75ccc..ade1dcc388f2 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.arrow.vectorized.parquet; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.arrow.vector.BaseVariableWidthVector; import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.DecimalVector; @@ -105,6 +106,17 @@ protected void nextVal( } } + class TimestampInt96DictEncodedReader extends BaseDictEncodedReader { + @Override + protected void nextVal( + FieldVector vector, Dictionary dict, int idx, int currentVal, int typeWidth) { + ByteBuffer buffer = + dict.decodeToBinary(currentVal).toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + long timestampInt96 = TimestampUtil.extractTimestampInt96(buffer); + vector.getDataBuffer().setLong(idx, timestampInt96); + } + } + class IntegerDictEncodedReader extends BaseDictEncodedReader { @Override protected void nextVal( @@ -200,6 +212,10 @@ public TimestampMillisDictEncodedReader timestampMillisDictEncodedReader() { return new TimestampMillisDictEncodedReader(); } + public TimestampInt96DictEncodedReader timestampInt96DictEncodedReader() { + return new TimestampInt96DictEncodedReader(); + } + public IntegerDictEncodedReader integerDictEncodedReader() { return new IntegerDictEncodedReader(); } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java index 47d9ae1da82b..6547e2cdcfb0 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java @@ -283,6 +283,32 @@ protected void nextDictEncodedVal( } } + /** Method for reading a batch of values of TimestampInt96 data type. */ + class TimestampInt96PageReader extends BagePageReader { + @Override + protected void nextVal( + FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + vectorizedDefinitionLevelReader + .timestampInt96Reader() + .nextBatch(vector, numVals, typeWidth, batchSize, holder, plainValuesReader); + } + + @Override + protected void nextDictEncodedVal( + FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + vectorizedDefinitionLevelReader + .timestampInt96Reader() + .nextDictEncodedBatch( + vector, + numVals, + typeWidth, + batchSize, + holder, + dictionaryEncodedValuesReader, + dictionary); + } + } + /** Method for reading a batch of values of FLOAT data type. */ class FloatPageReader extends BagePageReader { @@ -539,6 +565,10 @@ TimestampMillisPageReader timestampMillisPageReader() { return new TimestampMillisPageReader(); } + TimestampInt96PageReader timestampInt96PageReader() { + return new TimestampInt96PageReader(); + } + FloatPageReader floatPageReader() { return new FloatPageReader(); } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java index a036aee9e683..47880687719f 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.arrow.vectorized.parquet; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.vector.BaseVariableWidthVector; import org.apache.arrow.vector.BitVector; @@ -447,6 +448,42 @@ protected void nextDictEncodedVal( } } + class TimestampInt96Reader extends BaseReader { + @Override + protected void nextVal( + FieldVector vector, + int idx, + ValuesAsBytesReader valuesReader, + int typeWidth, + byte[] byteArray) { + ByteBuffer buffer = valuesReader.getBuffer(12); + long timestampInt96 = TimestampUtil.extractTimestampInt96(buffer); + vector.getDataBuffer().setLong((long) idx * typeWidth, timestampInt96); + } + + @Override + protected void nextDictEncodedVal( + FieldVector vector, + int idx, + VectorizedDictionaryEncodedParquetValuesReader reader, + int numValuesToRead, + Dictionary dict, + NullabilityHolder nullabilityHolder, + int typeWidth, + Mode mode) { + if (Mode.RLE.equals(mode)) { + reader + .timestampInt96DictEncodedReader() + .nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, typeWidth); + } else if (Mode.PACKED.equals(mode)) { + ByteBuffer buffer = + dict.decodeToBinary(reader.readInteger()).toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + long timestampInt96 = TimestampUtil.extractTimestampInt96(buffer); + vector.getDataBuffer().setLong(idx, timestampInt96); + } + } + } + class FixedWidthBinaryReader extends BaseReader { @Override protected void nextVal( @@ -777,6 +814,10 @@ TimestampMillisReader timestampMillisReader() { return new TimestampMillisReader(); } + TimestampInt96Reader timestampInt96Reader() { + return new TimestampInt96Reader(); + } + FixedWidthBinaryReader fixedWidthBinaryReader() { return new FixedWidthBinaryReader(); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java index b55ba0e2199a..9bd7220b905a 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java @@ -50,6 +50,11 @@ public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spe return TABLES.create(schema, spec, tableLocation); } + @Override + public void dropTable(TableIdentifier ident) { + TABLES.dropTable(tableLocation); + } + @Override public Table loadTable(TableIdentifier ident, String entriesSuffix) { return TABLES.load(loadLocation(ident, entriesSuffix)); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java index de26f5f82c49..6f3d8416a047 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java @@ -43,11 +43,10 @@ public static void start() { @After public void dropTable() throws IOException { - Table table = catalog.loadTable(currentIdentifier); - Path tablePath = new Path(table.location()); - FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf()); - fs.delete(tablePath, true); - catalog.dropTable(currentIdentifier, false); + if (!catalog.tableExists(currentIdentifier)) { + return; + } + dropTable(currentIdentifier); } @Override @@ -56,6 +55,15 @@ public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spe return TestIcebergSourceHiveTables.catalog.createTable(ident, schema, spec); } + @Override + public void dropTable(TableIdentifier ident) throws IOException { + Table table = catalog.loadTable(ident); + Path tablePath = new Path(table.location()); + FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf()); + fs.delete(tablePath, true); + catalog.dropTable(ident, false); + } + @Override public Table loadTable(TableIdentifier ident, String entriesSuffix) { TableIdentifier identifier = diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 0f6ae3f20d77..c5aee88a9a28 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -23,8 +23,11 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Comparator; import java.util.List; import java.util.StringJoiner; @@ -71,12 +74,15 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.spark.SparkException; +import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; import org.junit.Assert; import org.junit.Rule; @@ -112,6 +118,8 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase { public abstract String loadLocation(TableIdentifier ident); + public abstract void dropTable(TableIdentifier ident) throws IOException; + @Test public synchronized void testTablesSupport() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); @@ -1765,6 +1773,70 @@ public void testAllManifestTableSnapshotFiltering() throws Exception { } } + @Test + public void testTableWithInt96Timestamp() throws IOException { + try { + File parquetTableDir = temp.newFolder("table_timestamp_int96"); + String parquetTableLocation = parquetTableDir.toURI().toString(); + Schema schema = + new Schema( + optional(1, "id", Types.LongType.get()), + optional(2, "tmp_col", Types.TimestampType.withZone())); + spark.conf().set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key(), "INT96"); + + LocalDateTime start = LocalDateTime.of(2000, 1, 31, 0, 0, 0); + LocalDateTime end = LocalDateTime.of(2100, 1, 1, 0, 0, 0); + long startSec = start.toEpochSecond(ZoneOffset.UTC); + long endSec = end.toEpochSecond(ZoneOffset.UTC); + Column idColumn = functions.expr("id"); + Column secondsColumn = + functions.expr("(id % " + (endSec - startSec) + " + " + startSec + ")").as("seconds"); + Column timestampColumn = functions.expr("cast( seconds as timestamp) as tmp_col"); + + for (Boolean useDict : new Boolean[] {true, false}) { + for (Boolean useVectorization : new Boolean[] {true, false}) { + spark.sql("DROP TABLE IF EXISTS parquet_table"); + spark.range(0, 5000, 100, 1) + .select(idColumn, secondsColumn) + .select(idColumn, timestampColumn) + .write() + .format("parquet") + .option("parquet.enable.dictionary", useDict) + .mode("overwrite") + .option("path", parquetTableLocation) + .saveAsTable("parquet_table"); + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table_with_timestamp_int96"); + Table table = createTable(tableIdentifier, schema, PartitionSpec.unpartitioned()); + table + .updateProperties() + .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, useVectorization.toString()) + .commit(); + + String stagingLocation = table.location() + "/metadata"; + SparkTableUtil.importSparkTable( + spark, + new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), + table, + stagingLocation); + + // validate we get the expected results back + List expected = spark.table("parquet_table").select("tmp_col").collectAsList(); + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier)) + .select("tmp_col") + .collectAsList(); + Assert.assertEquals("Rows must match", expected, actual); + dropTable(tableIdentifier); + } + } + } finally { + spark.sql("drop table if exists parquet_table"); + } + } + private GenericData.Record manifestRecord( Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { GenericRecordBuilder builder = From ef7a744770e0059d62f091392c3e38c6183760df Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Wed, 1 Mar 2023 13:45:06 +0800 Subject: [PATCH 02/12] Support vectorized reading int96 timestamps in imported data --- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index c5aee88a9a28..cdfbb71b21dc 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1796,7 +1796,8 @@ public void testTableWithInt96Timestamp() throws IOException { for (Boolean useDict : new Boolean[] {true, false}) { for (Boolean useVectorization : new Boolean[] {true, false}) { spark.sql("DROP TABLE IF EXISTS parquet_table"); - spark.range(0, 5000, 100, 1) + spark + .range(0, 5000, 100, 1) .select(idColumn, secondsColumn) .select(idColumn, timestampColumn) .write() From 2695f2568ee23549828ec38a652be052d278c218 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Tue, 14 Mar 2023 23:14:12 +0800 Subject: [PATCH 03/12] fix comments --- .../vectorized/VectorizedArrowReader.java | 26 +++++++++---------- ...ectorizedParquetDefinitionLevelReader.java | 3 ++- .../spark/data/SparkParquetReaders.java | 7 +---- .../spark/data/SparkParquetReaders.java | 7 +---- .../spark/data/SparkParquetReaders.java | 7 +---- .../spark/data/SparkParquetReaders.java | 9 ++----- 6 files changed, 20 insertions(+), 39 deletions(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 2d1bc16208f7..1823b67b4a68 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -167,11 +167,6 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { .fixedWidthTypeBinaryBatchReader() .nextBatch(vec, typeWidth, nullabilityHolder); break; - case TIMESTAMP_INT96: - vectorizedColumnIterator - .timestampInt96BatchReader() - .nextBatch(vec, typeWidth, nullabilityHolder); - break; case BOOLEAN: vectorizedColumnIterator.booleanBatchReader().nextBatch(vec, -1, nullabilityHolder); break; @@ -198,6 +193,11 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { .timestampMillisBatchReader() .nextBatch(vec, typeWidth, nullabilityHolder); break; + case TIMESTAMP_INT96: + vectorizedColumnIterator + .timestampInt96BatchReader() + .nextBatch(vec, typeWidth, nullabilityHolder); + break; case UUID: vectorizedColumnIterator .fixedSizeBinaryBatchReader() @@ -340,14 +340,6 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF vec.allocateNew(); this.typeWidth = len; break; - case INT96: - int length = BigIntVector.TYPE_WIDTH; - this.readType = ReadType.TIMESTAMP_INT96; - this.vec = arrowField.createVector(rootAlloc); - vec.setInitialCapacity(batchSize * length); - vec.allocateNew(); - this.typeWidth = length; - break; case BINARY: this.vec = arrowField.createVector(rootAlloc); // TODO: Possibly use the uncompressed page size info to set the initial capacity @@ -368,6 +360,14 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF this.readType = ReadType.INT; this.typeWidth = (int) IntVector.TYPE_WIDTH; break; + case INT96: + int length = BigIntVector.TYPE_WIDTH; + this.readType = ReadType.TIMESTAMP_INT96; + this.vec = arrowField.createVector(rootAlloc); + vec.setInitialCapacity(batchSize * length); + vec.allocateNew(); + this.typeWidth = length; + break; case FLOAT: Field floatField = new Field( diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java index 47880687719f..672892bea1c1 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java @@ -456,7 +456,8 @@ protected void nextVal( ValuesAsBytesReader valuesReader, int typeWidth, byte[] byteArray) { - ByteBuffer buffer = valuesReader.getBuffer(12); + // 8 bytes (time of day nanos) + 4 bytes(julianDay) = 12 bytes + ByteBuffer buffer = valuesReader.getBuffer(12).order(ByteOrder.LITTLE_ENDIAN); long timestampInt96 = TimestampUtil.extractTimestampInt96(buffer); vector.getDataBuffer().setLong((long) idx * typeWidth, timestampInt96); } diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 836ac46d19d7..8e8c764009bd 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -377,7 +377,6 @@ public long readLong() { } private static class TimestampInt96Reader extends UnboxedReader { - private static final long UNIX_EPOCH_JULIAN = 2_440_588L; TimestampInt96Reader(ColumnDescriptor desc) { super(desc); @@ -392,11 +391,7 @@ public Long read(Long ignored) { public long readLong() { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - final long timeOfDayNanos = byteBuffer.getLong(); - final int julianDay = byteBuffer.getInt(); - - return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) - + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + return TimestampUtil.extractTimestampInt96(byteBuffer); } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 5dfb51e14ba1..200480d80401 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -378,7 +378,6 @@ public long readLong() { } private static class TimestampInt96Reader extends UnboxedReader { - private static final long UNIX_EPOCH_JULIAN = 2_440_588L; TimestampInt96Reader(ColumnDescriptor desc) { super(desc); @@ -393,11 +392,7 @@ public Long read(Long ignored) { public long readLong() { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - final long timeOfDayNanos = byteBuffer.getLong(); - final int julianDay = byteBuffer.getInt(); - - return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) - + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + return TimestampUtil.extractTimestampInt96(byteBuffer); } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 76e780f7a065..59e765bb226f 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -377,7 +377,6 @@ public long readLong() { } private static class TimestampInt96Reader extends UnboxedReader { - private static final long UNIX_EPOCH_JULIAN = 2_440_588L; TimestampInt96Reader(ColumnDescriptor desc) { super(desc); @@ -392,11 +391,7 @@ public Long read(Long ignored) { public long readLong() { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - final long timeOfDayNanos = byteBuffer.getLong(); - final int julianDay = byteBuffer.getInt(); - - return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) - + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + return TimestampUtil.extractTimestampInt96(byteBuffer); } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 76e780f7a065..ae242acc6f6a 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -25,9 +25,9 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.arrow.vectorized.parquet.TimestampUtil; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; @@ -377,7 +377,6 @@ public long readLong() { } private static class TimestampInt96Reader extends UnboxedReader { - private static final long UNIX_EPOCH_JULIAN = 2_440_588L; TimestampInt96Reader(ColumnDescriptor desc) { super(desc); @@ -392,11 +391,7 @@ public Long read(Long ignored) { public long readLong() { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - final long timeOfDayNanos = byteBuffer.getLong(); - final int julianDay = byteBuffer.getInt(); - - return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) - + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + return TimestampUtil.extractTimestampInt96(byteBuffer); } } From 11f0a71042a38046bbc3b59ee841214c45a52ac4 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Tue, 14 Mar 2023 23:23:24 +0800 Subject: [PATCH 04/12] code style --- .../java/org/apache/iceberg/spark/data/SparkParquetReaders.java | 1 - .../java/org/apache/iceberg/spark/data/SparkParquetReaders.java | 1 - .../java/org/apache/iceberg/spark/data/SparkParquetReaders.java | 1 - 3 files changed, 3 deletions(-) diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 8e8c764009bd..d6078d08d89d 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -25,7 +25,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 200480d80401..e7c2de244f4d 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -25,7 +25,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 59e765bb226f..6dc439970878 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -25,7 +25,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; From ab008f66104406a173d38f7a1165af2dd7d674cc Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Wed, 15 Mar 2023 08:19:42 +0800 Subject: [PATCH 05/12] import util --- .../java/org/apache/iceberg/spark/data/SparkParquetReaders.java | 1 + .../java/org/apache/iceberg/spark/data/SparkParquetReaders.java | 1 + .../java/org/apache/iceberg/spark/data/SparkParquetReaders.java | 1 + 3 files changed, 3 insertions(+) diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index d6078d08d89d..90dae424ab23 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.arrow.vectorized.parquet.TimestampUtil; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index e7c2de244f4d..da7f2ff179ce 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.arrow.vectorized.parquet.TimestampUtil; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 6dc439970878..ae242acc6f6a 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.arrow.vectorized.parquet.TimestampUtil; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; From d7a07457287a3d6b6ad4dd598d9eb7b828d156c2 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Tue, 4 Apr 2023 00:11:26 +0800 Subject: [PATCH 06/12] update comments --- .../GenericArrowVectorAccessorFactory.java | 7 +- .../vectorized/VectorizedArrowReader.java | 3 + .../vectorized/parquet/TimestampUtil.java | 36 --- ...dDictionaryEncodedParquetValuesReader.java | 3 +- ...ectorizedParquetDefinitionLevelReader.java | 5 +- .../apache/iceberg/parquet/ParquetUtil.java | 16 ++ .../spark/data/SparkParquetReaders.java | 4 +- .../source/TestIcebergSourceTablesBase.java | 232 +++++++++--------- 8 files changed, 142 insertions(+), 164 deletions(-) delete mode 100644 arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/TimestampUtil.java diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java index 56313144df70..a1f5763b9f1e 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java @@ -43,7 +43,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.util.DecimalUtility; -import org.apache.iceberg.arrow.vectorized.parquet.TimestampUtil; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; @@ -159,6 +159,9 @@ public ArrowVectorAccessor getVecto case INT64: return new DictionaryLongAccessor<>((IntVector) vector, dictionary); case INT96: + // Impala & Spark used to write timestamps as INT96 default. For backwards compatibility + // we try to read INT96 as timestamps. But INT96 is not recommended and deprecated + // (see https://issues.apache.org/jira/browse/PARQUET-323) return new DictionaryTimestampInt96Accessor<>((IntVector) vector, dictionary); case DOUBLE: return new DictionaryDoubleAccessor<>((IntVector) vector, dictionary); @@ -461,7 +464,7 @@ public final long getLong(int rowId) { .decodeToBinary(offsetVector.get(rowId)) .toByteBuffer() .order(ByteOrder.LITTLE_ENDIAN); - return TimestampUtil.extractTimestampInt96(byteBuffer); + return ParquetUtil.extractTimestampInt96(byteBuffer); } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 1823b67b4a68..62ad87dfdc71 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -361,6 +361,9 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF this.typeWidth = (int) IntVector.TYPE_WIDTH; break; case INT96: + // Impala & Spark used to write timestamps as INT96 default. For backwards compatibility + // we try to read INT96 as timestamps. But INT96 is not recommended and deprecated + // (see https://issues.apache.org/jira/browse/PARQUET-323) int length = BigIntVector.TYPE_WIDTH; this.readType = ReadType.TIMESTAMP_INT96; this.vec = arrowField.createVector(rootAlloc); diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/TimestampUtil.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/TimestampUtil.java deleted file mode 100644 index 88e419014228..000000000000 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/TimestampUtil.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.arrow.vectorized.parquet; - -import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; - -public class TimestampUtil { - - private TimestampUtil() {} - - private static final long UNIX_EPOCH_JULIAN = 2_440_588L; - - public static long extractTimestampInt96(ByteBuffer buffer) { - long timeOfDayNanos = buffer.getLong(); - int julianDay = buffer.getInt(); - return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) - + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); - } -} diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java index ade1dcc388f2..55f1d3fd7908 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java @@ -27,6 +27,7 @@ import org.apache.arrow.vector.FixedSizeBinaryVector; import org.apache.arrow.vector.IntVector; import org.apache.iceberg.arrow.vectorized.NullabilityHolder; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.parquet.column.Dictionary; /** @@ -112,7 +113,7 @@ protected void nextVal( FieldVector vector, Dictionary dict, int idx, int currentVal, int typeWidth) { ByteBuffer buffer = dict.decodeToBinary(currentVal).toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - long timestampInt96 = TimestampUtil.extractTimestampInt96(buffer); + long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer); vector.getDataBuffer().setLong(idx, timestampInt96); } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java index 672892bea1c1..ba86c7d7e183 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java @@ -30,6 +30,7 @@ import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.iceberg.arrow.vectorized.NullabilityHolder; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ValuesAsBytesReader; import org.apache.parquet.column.Dictionary; @@ -458,7 +459,7 @@ protected void nextVal( byte[] byteArray) { // 8 bytes (time of day nanos) + 4 bytes(julianDay) = 12 bytes ByteBuffer buffer = valuesReader.getBuffer(12).order(ByteOrder.LITTLE_ENDIAN); - long timestampInt96 = TimestampUtil.extractTimestampInt96(buffer); + long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer); vector.getDataBuffer().setLong((long) idx * typeWidth, timestampInt96); } @@ -479,7 +480,7 @@ protected void nextDictEncodedVal( } else if (Mode.PACKED.equals(mode)) { ByteBuffer buffer = dict.decodeToBinary(reader.readInteger()).toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - long timestampInt96 = TimestampUtil.extractTimestampInt96(buffer); + long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer); vector.getDataBuffer().setLong(idx, timestampInt96); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java index b6d57d073e0e..4d4b4ad0c0bf 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -68,6 +69,8 @@ public class ParquetUtil { // not meant to be instantiated private ParquetUtil() {} + private static final long UNIX_EPOCH_JULIAN = 2_440_588L; + public static Metrics fileMetrics(InputFile file, MetricsConfig metricsConfig) { return fileMetrics(file, metricsConfig, null); } @@ -403,4 +406,17 @@ public static boolean isIntType(PrimitiveType primitiveType) { } return primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32; } + + /** + * Method to read timestamp (parquet Int96) from bytebuffer. + * Read 12 bytes in byteBuffer: 8 bytes (time of day nanos) + 4 bytes(julianDay) + */ + public static long extractTimestampInt96(ByteBuffer buffer) { + // 8 bytes (time of day nanos) + long timeOfDayNanos = buffer.getLong(); + // 4 bytes(julianDay) + int julianDay = buffer.getInt(); + return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) + + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index ae242acc6f6a..59f81de6ae4a 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -27,8 +27,8 @@ import java.util.Map; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; -import org.apache.iceberg.arrow.vectorized.parquet.TimestampUtil; import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader; @@ -391,7 +391,7 @@ public Long read(Long ignored) { public long readLong() { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - return TimestampUtil.extractTimestampInt96(byteBuffer); + return ParquetUtil.extractTimestampInt96(byteBuffer); } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index cdfbb71b21dc..beeca265408a 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -84,6 +84,7 @@ import org.apache.spark.sql.functions; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; +import org.junit.After; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -120,6 +121,11 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase { public abstract void dropTable(TableIdentifier ident) throws IOException; + @After + public void removeTable() { + spark.sql("DROP TABLE IF EXISTS parquet_table"); + } + @Test public synchronized void testTablesSupport() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); @@ -479,8 +485,6 @@ public void testFilesTable() throws Exception { @Test public void testFilesTableWithSnapshotIdInheritance() throws Exception { - spark.sql("DROP TABLE IF EXISTS parquet_table"); - TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_inheritance_test"); Table table = createTable(tableIdentifier, SCHEMA, SPEC); table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true").commit(); @@ -503,45 +507,39 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson).commit(); - try { - String stagingLocation = table.location() + "/metadata"; - SparkTableUtil.importSparkTable( - spark, - new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), - table, - stagingLocation); - - Dataset filesTableDs = - spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files")); - List actual = TestHelpers.selectNonDerived(filesTableDs).collectAsList(); - - List expected = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) { - InputFile in = table.io().newInputFile(manifest.path()); - try (CloseableIterable rows = - Avro.read(in).project(entriesTable.schema()).build()) { - for (GenericData.Record record : rows) { - GenericData.Record file = (GenericData.Record) record.get("data_file"); - TestHelpers.asMetadataRecord(file); - expected.add(file); - } + String stagingLocation = table.location() + "/metadata"; + SparkTableUtil.importSparkTable( + spark, + new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), + table, + stagingLocation); + + Dataset filesTableDs = + spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files")); + List actual = TestHelpers.selectNonDerived(filesTableDs).collectAsList(); + + List expected = Lists.newArrayList(); + for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) { + InputFile in = table.io().newInputFile(manifest.path()); + try (CloseableIterable rows = + Avro.read(in).project(entriesTable.schema()).build()) { + for (GenericData.Record record : rows) { + GenericData.Record file = (GenericData.Record) record.get("data_file"); + TestHelpers.asMetadataRecord(file); + expected.add(file); } } - - Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs); - Assert.assertEquals("Files table should have one row", 2, expected.size()); - Assert.assertEquals("Actual results should have one row", 2, actual.size()); - TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0)); - TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1)); - } finally { - spark.sql("DROP TABLE parquet_table"); } + + Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs); + Assert.assertEquals("Files table should have one row", 2, expected.size()); + Assert.assertEquals("Actual results should have one row", 2, actual.size()); + TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0)); + TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1)); } @Test public void testEntriesTableWithSnapshotIdInheritance() throws Exception { - spark.sql("DROP TABLE IF EXISTS parquet_table"); - TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_inheritance_test"); PartitionSpec spec = SPEC; Table table = createTable(tableIdentifier, SCHEMA, spec); @@ -560,34 +558,30 @@ public void testEntriesTableWithSnapshotIdInheritance() throws Exception { Dataset inputDF = spark.createDataFrame(records, SimpleRecord.class); inputDF.select("data", "id").write().mode("overwrite").insertInto("parquet_table"); - try { - String stagingLocation = table.location() + "/metadata"; - SparkTableUtil.importSparkTable( - spark, - new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), - table, - stagingLocation); - - List actual = - spark - .read() - .format("iceberg") - .load(loadLocation(tableIdentifier, "entries")) - .select("sequence_number", "snapshot_id", "data_file") - .collectAsList(); - - table.refresh(); - - long snapshotId = table.currentSnapshot().snapshotId(); - - Assert.assertEquals("Entries table should have 2 rows", 2, actual.size()); - Assert.assertEquals("Sequence number must match", 0, actual.get(0).getLong(0)); - Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(0).getLong(1)); - Assert.assertEquals("Sequence number must match", 0, actual.get(1).getLong(0)); - Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(1).getLong(1)); - } finally { - spark.sql("DROP TABLE parquet_table"); - } + String stagingLocation = table.location() + "/metadata"; + SparkTableUtil.importSparkTable( + spark, + new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), + table, + stagingLocation); + + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "entries")) + .select("sequence_number", "snapshot_id", "data_file") + .collectAsList(); + + table.refresh(); + + long snapshotId = table.currentSnapshot().snapshotId(); + + Assert.assertEquals("Entries table should have 2 rows", 2, actual.size()); + Assert.assertEquals("Sequence number must match", 0, actual.get(0).getLong(0)); + Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(0).getLong(1)); + Assert.assertEquals("Sequence number must match", 0, actual.get(1).getLong(0)); + Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(1).getLong(1)); } @Test @@ -1775,66 +1769,62 @@ public void testAllManifestTableSnapshotFiltering() throws Exception { @Test public void testTableWithInt96Timestamp() throws IOException { - try { - File parquetTableDir = temp.newFolder("table_timestamp_int96"); - String parquetTableLocation = parquetTableDir.toURI().toString(); - Schema schema = - new Schema( - optional(1, "id", Types.LongType.get()), - optional(2, "tmp_col", Types.TimestampType.withZone())); - spark.conf().set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key(), "INT96"); - - LocalDateTime start = LocalDateTime.of(2000, 1, 31, 0, 0, 0); - LocalDateTime end = LocalDateTime.of(2100, 1, 1, 0, 0, 0); - long startSec = start.toEpochSecond(ZoneOffset.UTC); - long endSec = end.toEpochSecond(ZoneOffset.UTC); - Column idColumn = functions.expr("id"); - Column secondsColumn = - functions.expr("(id % " + (endSec - startSec) + " + " + startSec + ")").as("seconds"); - Column timestampColumn = functions.expr("cast( seconds as timestamp) as tmp_col"); - - for (Boolean useDict : new Boolean[] {true, false}) { - for (Boolean useVectorization : new Boolean[] {true, false}) { - spark.sql("DROP TABLE IF EXISTS parquet_table"); - spark - .range(0, 5000, 100, 1) - .select(idColumn, secondsColumn) - .select(idColumn, timestampColumn) - .write() - .format("parquet") - .option("parquet.enable.dictionary", useDict) - .mode("overwrite") - .option("path", parquetTableLocation) - .saveAsTable("parquet_table"); - TableIdentifier tableIdentifier = TableIdentifier.of("db", "table_with_timestamp_int96"); - Table table = createTable(tableIdentifier, schema, PartitionSpec.unpartitioned()); - table - .updateProperties() - .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, useVectorization.toString()) - .commit(); - - String stagingLocation = table.location() + "/metadata"; - SparkTableUtil.importSparkTable( - spark, - new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), - table, - stagingLocation); - - // validate we get the expected results back - List expected = spark.table("parquet_table").select("tmp_col").collectAsList(); - List actual = - spark - .read() - .format("iceberg") - .load(loadLocation(tableIdentifier)) - .select("tmp_col") - .collectAsList(); - Assert.assertEquals("Rows must match", expected, actual); - dropTable(tableIdentifier); - } + File parquetTableDir = temp.newFolder("table_timestamp_int96"); + String parquetTableLocation = parquetTableDir.toURI().toString(); + Schema schema = + new Schema( + optional(1, "id", Types.LongType.get()), + optional(2, "tmp_col", Types.TimestampType.withZone())); + spark.conf().set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key(), "INT96"); + + LocalDateTime start = LocalDateTime.of(2000, 1, 31, 0, 0, 0); + LocalDateTime end = LocalDateTime.of(2100, 1, 1, 0, 0, 0); + long startSec = start.toEpochSecond(ZoneOffset.UTC); + long endSec = end.toEpochSecond(ZoneOffset.UTC); + Column idColumn = functions.expr("id"); + Column secondsColumn = + functions.expr("(id % " + (endSec - startSec) + " + " + startSec + ")").as("seconds"); + Column timestampColumn = functions.expr("cast( seconds as timestamp) as tmp_col"); + + for (Boolean useDict : new Boolean[] {true, false}) { + for (Boolean useVectorization : new Boolean[] {true, false}) { + spark.sql("DROP TABLE IF EXISTS parquet_table"); + spark + .range(0, 5000, 100, 1) + .select(idColumn, secondsColumn) + .select(idColumn, timestampColumn) + .write() + .format("parquet") + .option("parquet.enable.dictionary", useDict) + .mode("overwrite") + .option("path", parquetTableLocation) + .saveAsTable("parquet_table"); + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table_with_timestamp_int96"); + Table table = createTable(tableIdentifier, schema, PartitionSpec.unpartitioned()); + table + .updateProperties() + .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, useVectorization.toString()) + .commit(); + + String stagingLocation = table.location() + "/metadata"; + SparkTableUtil.importSparkTable( + spark, + new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), + table, + stagingLocation); + + // validate we get the expected results back + List expected = spark.table("parquet_table").select("tmp_col").collectAsList(); + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier)) + .select("tmp_col") + .collectAsList(); + Assert.assertEquals("Rows must match", expected, actual); + dropTable(tableIdentifier); } - } finally { - spark.sql("drop table if exists parquet_table"); } } From 5191bcd91ba56c33f8ff6ddf9c26745974da6c84 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Tue, 4 Apr 2023 00:13:34 +0800 Subject: [PATCH 07/12] update comments --- .../apache/iceberg/spark/source/TestIcebergSourceHiveTables.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java index 6f3d8416a047..6292a2c1a834 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java @@ -46,6 +46,7 @@ public void dropTable() throws IOException { if (!catalog.tableExists(currentIdentifier)) { return; } + dropTable(currentIdentifier); } From d0f0fb5fffce416146b516eda09d04c129335b4a Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Tue, 4 Apr 2023 00:29:16 +0800 Subject: [PATCH 08/12] code style --- .../src/main/java/org/apache/iceberg/parquet/ParquetUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java index 4d4b4ad0c0bf..a879fc5f51d1 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -408,8 +408,8 @@ public static boolean isIntType(PrimitiveType primitiveType) { } /** - * Method to read timestamp (parquet Int96) from bytebuffer. - * Read 12 bytes in byteBuffer: 8 bytes (time of day nanos) + 4 bytes(julianDay) + * Method to read timestamp (parquet Int96) from bytebuffer. Read 12 bytes in byteBuffer: 8 bytes + * (time of day nanos) + 4 bytes(julianDay) */ public static long extractTimestampInt96(ByteBuffer buffer) { // 8 bytes (time of day nanos) From 5d38dc71f764dcf61a7e5098dfff0a2c6f81c164 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Sun, 9 Apr 2023 00:00:10 +0800 Subject: [PATCH 09/12] fix spark 2.4 & 3.1 --- .../org/apache/iceberg/spark/data/SparkParquetReaders.java | 4 ++-- .../org/apache/iceberg/spark/data/SparkParquetReaders.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 90dae424ab23..bba68684a303 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -27,8 +27,8 @@ import java.util.Map; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; -import org.apache.iceberg.arrow.vectorized.parquet.TimestampUtil; import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader; @@ -391,7 +391,7 @@ public Long read(Long ignored) { public long readLong() { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - return TimestampUtil.extractTimestampInt96(byteBuffer); + return ParquetUtil.extractTimestampInt96(byteBuffer); } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index da7f2ff179ce..940e7934ec81 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -27,8 +27,8 @@ import java.util.Map; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; -import org.apache.iceberg.arrow.vectorized.parquet.TimestampUtil; import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader; @@ -392,7 +392,7 @@ public Long read(Long ignored) { public long readLong() { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - return TimestampUtil.extractTimestampInt96(byteBuffer); + return ParquetUtil.extractTimestampInt96(byteBuffer); } } From b2f75baf2d4edc91c45100c3a9192946a4d67f42 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Sun, 9 Apr 2023 07:58:19 +0800 Subject: [PATCH 10/12] fix import error in spark3.2 --- .../org/apache/iceberg/spark/data/SparkParquetReaders.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index ae242acc6f6a..59f81de6ae4a 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -27,8 +27,8 @@ import java.util.Map; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; -import org.apache.iceberg.arrow.vectorized.parquet.TimestampUtil; import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader; @@ -391,7 +391,7 @@ public Long read(Long ignored) { public long readLong() { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - return TimestampUtil.extractTimestampInt96(byteBuffer); + return ParquetUtil.extractTimestampInt96(byteBuffer); } } From a92561f12845e15e78d8110dec2ea3c14ecd9eb7 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Sun, 9 Apr 2023 11:56:42 +0800 Subject: [PATCH 11/12] fix comments --- .../GenericArrowVectorAccessorFactory.java | 6 ++--- .../vectorized/VectorizedArrowReader.java | 6 ++--- ...ectorizedParquetDefinitionLevelReader.java | 25 ++++++++++++------- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java index a1f5763b9f1e..939f34eeb0f8 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java @@ -159,9 +159,9 @@ public ArrowVectorAccessor getVecto case INT64: return new DictionaryLongAccessor<>((IntVector) vector, dictionary); case INT96: - // Impala & Spark used to write timestamps as INT96 default. For backwards compatibility - // we try to read INT96 as timestamps. But INT96 is not recommended and deprecated - // (see https://issues.apache.org/jira/browse/PARQUET-323) + // Impala & Spark used to write timestamps as INT96 by default. For backwards + // compatibility we try to read INT96 as timestamps. But INT96 is not recommended + // and deprecated (see https://issues.apache.org/jira/browse/PARQUET-323) return new DictionaryTimestampInt96Accessor<>((IntVector) vector, dictionary); case DOUBLE: return new DictionaryDoubleAccessor<>((IntVector) vector, dictionary); diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 62ad87dfdc71..8c265d4f55d8 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -361,9 +361,9 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF this.typeWidth = (int) IntVector.TYPE_WIDTH; break; case INT96: - // Impala & Spark used to write timestamps as INT96 default. For backwards compatibility - // we try to read INT96 as timestamps. But INT96 is not recommended and deprecated - // (see https://issues.apache.org/jira/browse/PARQUET-323) + // Impala & Spark used to write timestamps as INT96 by default. For backwards + // compatibility we try to read INT96 as timestamps. But INT96 is not recommended + // and deprecated (see https://issues.apache.org/jira/browse/PARQUET-323) int length = BigIntVector.TYPE_WIDTH; this.readType = ReadType.TIMESTAMP_INT96; this.vec = arrowField.createVector(rootAlloc); diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java index ba86c7d7e183..55fead41dafa 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java @@ -473,15 +473,22 @@ protected void nextDictEncodedVal( NullabilityHolder nullabilityHolder, int typeWidth, Mode mode) { - if (Mode.RLE.equals(mode)) { - reader - .timestampInt96DictEncodedReader() - .nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, typeWidth); - } else if (Mode.PACKED.equals(mode)) { - ByteBuffer buffer = - dict.decodeToBinary(reader.readInteger()).toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer); - vector.getDataBuffer().setLong(idx, timestampInt96); + switch (mode) { + case RLE: + reader + .timestampInt96DictEncodedReader() + .nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, typeWidth); + break; + case PACKED: + ByteBuffer buffer = + dict.decodeToBinary(reader.readInteger()) + .toByteBuffer() + .order(ByteOrder.LITTLE_ENDIAN); + long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer); + vector.getDataBuffer().setLong(idx, timestampInt96); + break; + default: + throw new UnsupportedOperationException("Not a supported mode: " + mode); } } } From 80f506307d0de9788db4f5066ac96d0047b0eece Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Sun, 9 Apr 2023 15:00:43 +0800 Subject: [PATCH 12/12] fix comments --- .../parquet/VectorizedParquetDefinitionLevelReader.java | 3 ++- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java index 55fead41dafa..20d7f804978c 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java @@ -488,7 +488,8 @@ protected void nextDictEncodedVal( vector.getDataBuffer().setLong(idx, timestampInt96); break; default: - throw new UnsupportedOperationException("Not a supported mode: " + mode); + throw new UnsupportedOperationException( + "Unsupported mode for timestamp int96 reader: " + mode); } } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index beeca265408a..5b34dd269fbd 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -84,6 +84,7 @@ import org.apache.spark.sql.functions; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -1822,7 +1823,9 @@ public void testTableWithInt96Timestamp() throws IOException { .load(loadLocation(tableIdentifier)) .select("tmp_col") .collectAsList(); - Assert.assertEquals("Rows must match", expected, actual); + Assertions.assertThat(actual) + .as("Rows must match") + .containsExactlyInAnyOrderElementsOf(expected); dropTable(tableIdentifier); } }