diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java index 83f3f62e7018..939f34eeb0f8 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java @@ -21,6 +21,7 @@ import java.lang.reflect.Array; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.function.IntFunction; import java.util.function.Supplier; import java.util.stream.IntStream; @@ -42,6 +43,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.util.DecimalUtility; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; @@ -156,6 +158,11 @@ public ArrowVectorAccessor getVecto return new DictionaryFloatAccessor<>((IntVector) vector, dictionary); case INT64: return new DictionaryLongAccessor<>((IntVector) vector, dictionary); + case INT96: + // Impala & Spark used to write timestamps as INT96 by default. For backwards + // compatibility we try to read INT96 as timestamps. But INT96 is not recommended + // and deprecated (see https://issues.apache.org/jira/browse/PARQUET-323) + return new DictionaryTimestampInt96Accessor<>((IntVector) vector, dictionary); case DOUBLE: return new DictionaryDoubleAccessor<>((IntVector) vector, dictionary); default: @@ -438,6 +445,29 @@ public final byte[] getBinary(int rowId) { } } + private static class DictionaryTimestampInt96Accessor< + DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable> + extends ArrowVectorAccessor { + private final IntVector offsetVector; + private final Dictionary dictionary; + + DictionaryTimestampInt96Accessor(IntVector vector, Dictionary dictionary) { + super(vector); + this.offsetVector = vector; + this.dictionary = dictionary; + } + + @Override + public final long getLong(int rowId) { + ByteBuffer byteBuffer = + dictionary + .decodeToBinary(offsetVector.get(rowId)) + .toByteBuffer() + .order(ByteOrder.LITTLE_ENDIAN); + return ParquetUtil.extractTimestampInt96(byteBuffer); + } + } + private static class DateAccessor< DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable> extends ArrowVectorAccessor { diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 79cbfb34bd54..8c265d4f55d8 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -112,6 +112,7 @@ private enum ReadType { FLOAT, DOUBLE, TIMESTAMP_MILLIS, + TIMESTAMP_INT96, TIME_MICROS, UUID, DICTIONARY @@ -192,6 +193,11 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { .timestampMillisBatchReader() .nextBatch(vec, typeWidth, nullabilityHolder); break; + case TIMESTAMP_INT96: + vectorizedColumnIterator + .timestampInt96BatchReader() + .nextBatch(vec, typeWidth, nullabilityHolder); + break; case UUID: vectorizedColumnIterator .fixedSizeBinaryBatchReader() @@ -354,6 +360,17 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF this.readType = ReadType.INT; this.typeWidth = (int) IntVector.TYPE_WIDTH; break; + case INT96: + // Impala & Spark used to write timestamps as INT96 by default. For backwards + // compatibility we try to read INT96 as timestamps. But INT96 is not recommended + // and deprecated (see https://issues.apache.org/jira/browse/PARQUET-323) + int length = BigIntVector.TYPE_WIDTH; + this.readType = ReadType.TIMESTAMP_INT96; + this.vec = arrowField.createVector(rootAlloc); + vec.setInitialCapacity(batchSize * length); + vec.allocateNew(); + this.typeWidth = length; + break; case FLOAT: Field floatField = new Field( diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java index 094e306d5bf1..081a5b6fd7da 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java @@ -144,6 +144,20 @@ protected int nextBatchOf( } } + public class TimestampInt96BatchReader extends BatchReader { + @Override + protected int nextBatchOf( + final FieldVector vector, + final int expectedBatchSize, + final int numValsInVector, + final int typeWidth, + NullabilityHolder holder) { + return vectorizedPageIterator + .timestampInt96PageReader() + .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder); + } + } + public class FloatBatchReader extends BatchReader { @Override protected int nextBatchOf( @@ -286,6 +300,10 @@ public TimestampMillisBatchReader timestampMillisBatchReader() { return new TimestampMillisBatchReader(); } + public TimestampInt96BatchReader timestampInt96BatchReader() { + return new TimestampInt96BatchReader(); + } + public FloatBatchReader floatBatchReader() { return new FloatBatchReader(); } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java index 96071fe75ccc..55f1d3fd7908 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.arrow.vectorized.parquet; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.arrow.vector.BaseVariableWidthVector; import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.DecimalVector; @@ -26,6 +27,7 @@ import org.apache.arrow.vector.FixedSizeBinaryVector; import org.apache.arrow.vector.IntVector; import org.apache.iceberg.arrow.vectorized.NullabilityHolder; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.parquet.column.Dictionary; /** @@ -105,6 +107,17 @@ protected void nextVal( } } + class TimestampInt96DictEncodedReader extends BaseDictEncodedReader { + @Override + protected void nextVal( + FieldVector vector, Dictionary dict, int idx, int currentVal, int typeWidth) { + ByteBuffer buffer = + dict.decodeToBinary(currentVal).toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer); + vector.getDataBuffer().setLong(idx, timestampInt96); + } + } + class IntegerDictEncodedReader extends BaseDictEncodedReader { @Override protected void nextVal( @@ -200,6 +213,10 @@ public TimestampMillisDictEncodedReader timestampMillisDictEncodedReader() { return new TimestampMillisDictEncodedReader(); } + public TimestampInt96DictEncodedReader timestampInt96DictEncodedReader() { + return new TimestampInt96DictEncodedReader(); + } + public IntegerDictEncodedReader integerDictEncodedReader() { return new IntegerDictEncodedReader(); } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java index 47d9ae1da82b..6547e2cdcfb0 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java @@ -283,6 +283,32 @@ protected void nextDictEncodedVal( } } + /** Method for reading a batch of values of TimestampInt96 data type. */ + class TimestampInt96PageReader extends BagePageReader { + @Override + protected void nextVal( + FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + vectorizedDefinitionLevelReader + .timestampInt96Reader() + .nextBatch(vector, numVals, typeWidth, batchSize, holder, plainValuesReader); + } + + @Override + protected void nextDictEncodedVal( + FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) { + vectorizedDefinitionLevelReader + .timestampInt96Reader() + .nextDictEncodedBatch( + vector, + numVals, + typeWidth, + batchSize, + holder, + dictionaryEncodedValuesReader, + dictionary); + } + } + /** Method for reading a batch of values of FLOAT data type. */ class FloatPageReader extends BagePageReader { @@ -539,6 +565,10 @@ TimestampMillisPageReader timestampMillisPageReader() { return new TimestampMillisPageReader(); } + TimestampInt96PageReader timestampInt96PageReader() { + return new TimestampInt96PageReader(); + } + FloatPageReader floatPageReader() { return new FloatPageReader(); } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java index a036aee9e683..20d7f804978c 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.arrow.vectorized.parquet; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.vector.BaseVariableWidthVector; import org.apache.arrow.vector.BitVector; @@ -29,6 +30,7 @@ import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.iceberg.arrow.vectorized.NullabilityHolder; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ValuesAsBytesReader; import org.apache.parquet.column.Dictionary; @@ -447,6 +449,51 @@ protected void nextDictEncodedVal( } } + class TimestampInt96Reader extends BaseReader { + @Override + protected void nextVal( + FieldVector vector, + int idx, + ValuesAsBytesReader valuesReader, + int typeWidth, + byte[] byteArray) { + // 8 bytes (time of day nanos) + 4 bytes(julianDay) = 12 bytes + ByteBuffer buffer = valuesReader.getBuffer(12).order(ByteOrder.LITTLE_ENDIAN); + long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer); + vector.getDataBuffer().setLong((long) idx * typeWidth, timestampInt96); + } + + @Override + protected void nextDictEncodedVal( + FieldVector vector, + int idx, + VectorizedDictionaryEncodedParquetValuesReader reader, + int numValuesToRead, + Dictionary dict, + NullabilityHolder nullabilityHolder, + int typeWidth, + Mode mode) { + switch (mode) { + case RLE: + reader + .timestampInt96DictEncodedReader() + .nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, typeWidth); + break; + case PACKED: + ByteBuffer buffer = + dict.decodeToBinary(reader.readInteger()) + .toByteBuffer() + .order(ByteOrder.LITTLE_ENDIAN); + long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer); + vector.getDataBuffer().setLong(idx, timestampInt96); + break; + default: + throw new UnsupportedOperationException( + "Unsupported mode for timestamp int96 reader: " + mode); + } + } + } + class FixedWidthBinaryReader extends BaseReader { @Override protected void nextVal( @@ -777,6 +824,10 @@ TimestampMillisReader timestampMillisReader() { return new TimestampMillisReader(); } + TimestampInt96Reader timestampInt96Reader() { + return new TimestampInt96Reader(); + } + FixedWidthBinaryReader fixedWidthBinaryReader() { return new FixedWidthBinaryReader(); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java index b6d57d073e0e..a879fc5f51d1 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -68,6 +69,8 @@ public class ParquetUtil { // not meant to be instantiated private ParquetUtil() {} + private static final long UNIX_EPOCH_JULIAN = 2_440_588L; + public static Metrics fileMetrics(InputFile file, MetricsConfig metricsConfig) { return fileMetrics(file, metricsConfig, null); } @@ -403,4 +406,17 @@ public static boolean isIntType(PrimitiveType primitiveType) { } return primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32; } + + /** + * Method to read timestamp (parquet Int96) from bytebuffer. Read 12 bytes in byteBuffer: 8 bytes + * (time of day nanos) + 4 bytes(julianDay) + */ + public static long extractTimestampInt96(ByteBuffer buffer) { + // 8 bytes (time of day nanos) + long timeOfDayNanos = buffer.getLong(); + // 4 bytes(julianDay) + int julianDay = buffer.getInt(); + return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) + + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + } } diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 836ac46d19d7..bba68684a303 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -25,10 +25,10 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader; @@ -377,7 +377,6 @@ public long readLong() { } private static class TimestampInt96Reader extends UnboxedReader { - private static final long UNIX_EPOCH_JULIAN = 2_440_588L; TimestampInt96Reader(ColumnDescriptor desc) { super(desc); @@ -392,11 +391,7 @@ public Long read(Long ignored) { public long readLong() { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - final long timeOfDayNanos = byteBuffer.getLong(); - final int julianDay = byteBuffer.getInt(); - - return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) - + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + return ParquetUtil.extractTimestampInt96(byteBuffer); } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 5dfb51e14ba1..940e7934ec81 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -25,10 +25,10 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader; @@ -378,7 +378,6 @@ public long readLong() { } private static class TimestampInt96Reader extends UnboxedReader { - private static final long UNIX_EPOCH_JULIAN = 2_440_588L; TimestampInt96Reader(ColumnDescriptor desc) { super(desc); @@ -393,11 +392,7 @@ public Long read(Long ignored) { public long readLong() { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - final long timeOfDayNanos = byteBuffer.getLong(); - final int julianDay = byteBuffer.getInt(); - - return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) - + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + return ParquetUtil.extractTimestampInt96(byteBuffer); } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 76e780f7a065..59f81de6ae4a 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -25,10 +25,10 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader; @@ -377,7 +377,6 @@ public long readLong() { } private static class TimestampInt96Reader extends UnboxedReader { - private static final long UNIX_EPOCH_JULIAN = 2_440_588L; TimestampInt96Reader(ColumnDescriptor desc) { super(desc); @@ -392,11 +391,7 @@ public Long read(Long ignored) { public long readLong() { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - final long timeOfDayNanos = byteBuffer.getLong(); - final int julianDay = byteBuffer.getInt(); - - return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) - + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + return ParquetUtil.extractTimestampInt96(byteBuffer); } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 76e780f7a065..59f81de6ae4a 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -25,10 +25,10 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader; @@ -377,7 +377,6 @@ public long readLong() { } private static class TimestampInt96Reader extends UnboxedReader { - private static final long UNIX_EPOCH_JULIAN = 2_440_588L; TimestampInt96Reader(ColumnDescriptor desc) { super(desc); @@ -392,11 +391,7 @@ public Long read(Long ignored) { public long readLong() { final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - final long timeOfDayNanos = byteBuffer.getLong(); - final int julianDay = byteBuffer.getInt(); - - return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) - + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + return ParquetUtil.extractTimestampInt96(byteBuffer); } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java index b55ba0e2199a..9bd7220b905a 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java @@ -50,6 +50,11 @@ public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spe return TABLES.create(schema, spec, tableLocation); } + @Override + public void dropTable(TableIdentifier ident) { + TABLES.dropTable(tableLocation); + } + @Override public Table loadTable(TableIdentifier ident, String entriesSuffix) { return TABLES.load(loadLocation(ident, entriesSuffix)); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java index de26f5f82c49..6292a2c1a834 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java @@ -43,11 +43,11 @@ public static void start() { @After public void dropTable() throws IOException { - Table table = catalog.loadTable(currentIdentifier); - Path tablePath = new Path(table.location()); - FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf()); - fs.delete(tablePath, true); - catalog.dropTable(currentIdentifier, false); + if (!catalog.tableExists(currentIdentifier)) { + return; + } + + dropTable(currentIdentifier); } @Override @@ -56,6 +56,15 @@ public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spe return TestIcebergSourceHiveTables.catalog.createTable(ident, schema, spec); } + @Override + public void dropTable(TableIdentifier ident) throws IOException { + Table table = catalog.loadTable(ident); + Path tablePath = new Path(table.location()); + FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf()); + fs.delete(tablePath, true); + catalog.dropTable(ident, false); + } + @Override public Table loadTable(TableIdentifier ident, String entriesSuffix) { TableIdentifier identifier = diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 0f6ae3f20d77..5b34dd269fbd 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -23,8 +23,11 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Comparator; import java.util.List; import java.util.StringJoiner; @@ -71,13 +74,18 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.spark.SparkException; +import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; +import org.assertj.core.api.Assertions; +import org.junit.After; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -112,6 +120,13 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase { public abstract String loadLocation(TableIdentifier ident); + public abstract void dropTable(TableIdentifier ident) throws IOException; + + @After + public void removeTable() { + spark.sql("DROP TABLE IF EXISTS parquet_table"); + } + @Test public synchronized void testTablesSupport() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); @@ -471,8 +486,6 @@ public void testFilesTable() throws Exception { @Test public void testFilesTableWithSnapshotIdInheritance() throws Exception { - spark.sql("DROP TABLE IF EXISTS parquet_table"); - TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_inheritance_test"); Table table = createTable(tableIdentifier, SCHEMA, SPEC); table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true").commit(); @@ -495,45 +508,39 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson).commit(); - try { - String stagingLocation = table.location() + "/metadata"; - SparkTableUtil.importSparkTable( - spark, - new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), - table, - stagingLocation); - - Dataset filesTableDs = - spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files")); - List actual = TestHelpers.selectNonDerived(filesTableDs).collectAsList(); - - List expected = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) { - InputFile in = table.io().newInputFile(manifest.path()); - try (CloseableIterable rows = - Avro.read(in).project(entriesTable.schema()).build()) { - for (GenericData.Record record : rows) { - GenericData.Record file = (GenericData.Record) record.get("data_file"); - TestHelpers.asMetadataRecord(file); - expected.add(file); - } + String stagingLocation = table.location() + "/metadata"; + SparkTableUtil.importSparkTable( + spark, + new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), + table, + stagingLocation); + + Dataset filesTableDs = + spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files")); + List actual = TestHelpers.selectNonDerived(filesTableDs).collectAsList(); + + List expected = Lists.newArrayList(); + for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) { + InputFile in = table.io().newInputFile(manifest.path()); + try (CloseableIterable rows = + Avro.read(in).project(entriesTable.schema()).build()) { + for (GenericData.Record record : rows) { + GenericData.Record file = (GenericData.Record) record.get("data_file"); + TestHelpers.asMetadataRecord(file); + expected.add(file); } } - - Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs); - Assert.assertEquals("Files table should have one row", 2, expected.size()); - Assert.assertEquals("Actual results should have one row", 2, actual.size()); - TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0)); - TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1)); - } finally { - spark.sql("DROP TABLE parquet_table"); } + + Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs); + Assert.assertEquals("Files table should have one row", 2, expected.size()); + Assert.assertEquals("Actual results should have one row", 2, actual.size()); + TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0)); + TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1)); } @Test public void testEntriesTableWithSnapshotIdInheritance() throws Exception { - spark.sql("DROP TABLE IF EXISTS parquet_table"); - TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_inheritance_test"); PartitionSpec spec = SPEC; Table table = createTable(tableIdentifier, SCHEMA, spec); @@ -552,34 +559,30 @@ public void testEntriesTableWithSnapshotIdInheritance() throws Exception { Dataset inputDF = spark.createDataFrame(records, SimpleRecord.class); inputDF.select("data", "id").write().mode("overwrite").insertInto("parquet_table"); - try { - String stagingLocation = table.location() + "/metadata"; - SparkTableUtil.importSparkTable( - spark, - new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), - table, - stagingLocation); - - List actual = - spark - .read() - .format("iceberg") - .load(loadLocation(tableIdentifier, "entries")) - .select("sequence_number", "snapshot_id", "data_file") - .collectAsList(); - - table.refresh(); - - long snapshotId = table.currentSnapshot().snapshotId(); - - Assert.assertEquals("Entries table should have 2 rows", 2, actual.size()); - Assert.assertEquals("Sequence number must match", 0, actual.get(0).getLong(0)); - Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(0).getLong(1)); - Assert.assertEquals("Sequence number must match", 0, actual.get(1).getLong(0)); - Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(1).getLong(1)); - } finally { - spark.sql("DROP TABLE parquet_table"); - } + String stagingLocation = table.location() + "/metadata"; + SparkTableUtil.importSparkTable( + spark, + new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), + table, + stagingLocation); + + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "entries")) + .select("sequence_number", "snapshot_id", "data_file") + .collectAsList(); + + table.refresh(); + + long snapshotId = table.currentSnapshot().snapshotId(); + + Assert.assertEquals("Entries table should have 2 rows", 2, actual.size()); + Assert.assertEquals("Sequence number must match", 0, actual.get(0).getLong(0)); + Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(0).getLong(1)); + Assert.assertEquals("Sequence number must match", 0, actual.get(1).getLong(0)); + Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(1).getLong(1)); } @Test @@ -1765,6 +1768,69 @@ public void testAllManifestTableSnapshotFiltering() throws Exception { } } + @Test + public void testTableWithInt96Timestamp() throws IOException { + File parquetTableDir = temp.newFolder("table_timestamp_int96"); + String parquetTableLocation = parquetTableDir.toURI().toString(); + Schema schema = + new Schema( + optional(1, "id", Types.LongType.get()), + optional(2, "tmp_col", Types.TimestampType.withZone())); + spark.conf().set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key(), "INT96"); + + LocalDateTime start = LocalDateTime.of(2000, 1, 31, 0, 0, 0); + LocalDateTime end = LocalDateTime.of(2100, 1, 1, 0, 0, 0); + long startSec = start.toEpochSecond(ZoneOffset.UTC); + long endSec = end.toEpochSecond(ZoneOffset.UTC); + Column idColumn = functions.expr("id"); + Column secondsColumn = + functions.expr("(id % " + (endSec - startSec) + " + " + startSec + ")").as("seconds"); + Column timestampColumn = functions.expr("cast( seconds as timestamp) as tmp_col"); + + for (Boolean useDict : new Boolean[] {true, false}) { + for (Boolean useVectorization : new Boolean[] {true, false}) { + spark.sql("DROP TABLE IF EXISTS parquet_table"); + spark + .range(0, 5000, 100, 1) + .select(idColumn, secondsColumn) + .select(idColumn, timestampColumn) + .write() + .format("parquet") + .option("parquet.enable.dictionary", useDict) + .mode("overwrite") + .option("path", parquetTableLocation) + .saveAsTable("parquet_table"); + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table_with_timestamp_int96"); + Table table = createTable(tableIdentifier, schema, PartitionSpec.unpartitioned()); + table + .updateProperties() + .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, useVectorization.toString()) + .commit(); + + String stagingLocation = table.location() + "/metadata"; + SparkTableUtil.importSparkTable( + spark, + new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), + table, + stagingLocation); + + // validate we get the expected results back + List expected = spark.table("parquet_table").select("tmp_col").collectAsList(); + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier)) + .select("tmp_col") + .collectAsList(); + Assertions.assertThat(actual) + .as("Rows must match") + .containsExactlyInAnyOrderElementsOf(expected); + dropTable(tableIdentifier); + } + } + } + private GenericData.Record manifestRecord( Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { GenericRecordBuilder builder =