Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import java.util.stream.IntStream;
Expand All @@ -42,6 +43,7 @@
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.util.DecimalUtility;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
Expand Down Expand Up @@ -156,6 +158,11 @@ public ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getVecto
return new DictionaryFloatAccessor<>((IntVector) vector, dictionary);
case INT64:
return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
case INT96:
// Impala & Spark used to write timestamps as INT96 default. For backwards compatibility
// we try to read INT96 as timestamps. But INT96 is not recommended and deprecated
// (see https://issues.apache.org/jira/browse/PARQUET-323)
return new DictionaryTimestampInt96Accessor<>((IntVector) vector, dictionary);
case DOUBLE:
return new DictionaryDoubleAccessor<>((IntVector) vector, dictionary);
default:
Expand Down Expand Up @@ -438,6 +445,29 @@ public final byte[] getBinary(int rowId) {
}
}

private static class DictionaryTimestampInt96Accessor<
DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
private final IntVector offsetVector;
private final Dictionary dictionary;

DictionaryTimestampInt96Accessor(IntVector vector, Dictionary dictionary) {
super(vector);
this.offsetVector = vector;
this.dictionary = dictionary;
}

@Override
public final long getLong(int rowId) {
ByteBuffer byteBuffer =
dictionary
.decodeToBinary(offsetVector.get(rowId))
.toByteBuffer()
.order(ByteOrder.LITTLE_ENDIAN);
return ParquetUtil.extractTimestampInt96(byteBuffer);
}
}

private static class DateAccessor<
DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private enum ReadType {
FLOAT,
DOUBLE,
TIMESTAMP_MILLIS,
TIMESTAMP_INT96,
TIME_MICROS,
UUID,
DICTIONARY
Expand Down Expand Up @@ -192,6 +193,11 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
.timestampMillisBatchReader()
.nextBatch(vec, typeWidth, nullabilityHolder);
break;
case TIMESTAMP_INT96:
vectorizedColumnIterator
.timestampInt96BatchReader()
.nextBatch(vec, typeWidth, nullabilityHolder);
break;
case UUID:
vectorizedColumnIterator
.fixedSizeBinaryBatchReader()
Expand Down Expand Up @@ -354,6 +360,17 @@ private void allocateVectorBasedOnTypeName(PrimitiveType primitive, Field arrowF
this.readType = ReadType.INT;
this.typeWidth = (int) IntVector.TYPE_WIDTH;
break;
case INT96:
// Impala & Spark used to write timestamps as INT96 default. For backwards compatibility
// we try to read INT96 as timestamps. But INT96 is not recommended and deprecated
// (see https://issues.apache.org/jira/browse/PARQUET-323)
int length = BigIntVector.TYPE_WIDTH;
this.readType = ReadType.TIMESTAMP_INT96;
this.vec = arrowField.createVector(rootAlloc);
vec.setInitialCapacity(batchSize * length);
vec.allocateNew();
this.typeWidth = length;
break;
case FLOAT:
Field floatField =
new Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ protected int nextBatchOf(
}
}

public class TimestampInt96BatchReader extends BatchReader {
@Override
protected int nextBatchOf(
final FieldVector vector,
final int expectedBatchSize,
final int numValsInVector,
final int typeWidth,
NullabilityHolder holder) {
return vectorizedPageIterator
.timestampInt96PageReader()
.nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder);
}
}

public class FloatBatchReader extends BatchReader {
@Override
protected int nextBatchOf(
Expand Down Expand Up @@ -286,6 +300,10 @@ public TimestampMillisBatchReader timestampMillisBatchReader() {
return new TimestampMillisBatchReader();
}

public TimestampInt96BatchReader timestampInt96BatchReader() {
return new TimestampInt96BatchReader();
}

public FloatBatchReader floatBatchReader() {
return new FloatBatchReader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.iceberg.arrow.vectorized.parquet;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.IntVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.parquet.column.Dictionary;

/**
Expand Down Expand Up @@ -105,6 +107,17 @@ protected void nextVal(
}
}

class TimestampInt96DictEncodedReader extends BaseDictEncodedReader {
@Override
protected void nextVal(
FieldVector vector, Dictionary dict, int idx, int currentVal, int typeWidth) {
ByteBuffer buffer =
dict.decodeToBinary(currentVal).toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer);
vector.getDataBuffer().setLong(idx, timestampInt96);
}
}

class IntegerDictEncodedReader extends BaseDictEncodedReader {
@Override
protected void nextVal(
Expand Down Expand Up @@ -200,6 +213,10 @@ public TimestampMillisDictEncodedReader timestampMillisDictEncodedReader() {
return new TimestampMillisDictEncodedReader();
}

public TimestampInt96DictEncodedReader timestampInt96DictEncodedReader() {
return new TimestampInt96DictEncodedReader();
}

public IntegerDictEncodedReader integerDictEncodedReader() {
return new IntegerDictEncodedReader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,32 @@ protected void nextDictEncodedVal(
}
}

/** Method for reading a batch of values of TimestampInt96 data type. */
class TimestampInt96PageReader extends BagePageReader {
@Override
protected void nextVal(
FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) {
vectorizedDefinitionLevelReader
.timestampInt96Reader()
.nextBatch(vector, numVals, typeWidth, batchSize, holder, plainValuesReader);
}

@Override
protected void nextDictEncodedVal(
FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) {
vectorizedDefinitionLevelReader
.timestampInt96Reader()
.nextDictEncodedBatch(
vector,
numVals,
typeWidth,
batchSize,
holder,
dictionaryEncodedValuesReader,
dictionary);
}
}

/** Method for reading a batch of values of FLOAT data type. */
class FloatPageReader extends BagePageReader {

Expand Down Expand Up @@ -539,6 +565,10 @@ TimestampMillisPageReader timestampMillisPageReader() {
return new TimestampMillisPageReader();
}

TimestampInt96PageReader timestampInt96PageReader() {
return new TimestampInt96PageReader();
}

FloatPageReader floatPageReader() {
return new FloatPageReader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.arrow.vectorized.parquet;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.BitVector;
Expand All @@ -29,6 +30,7 @@
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.ValuesAsBytesReader;
import org.apache.parquet.column.Dictionary;

Expand Down Expand Up @@ -447,6 +449,43 @@ protected void nextDictEncodedVal(
}
}

class TimestampInt96Reader extends BaseReader {
@Override
protected void nextVal(
FieldVector vector,
int idx,
ValuesAsBytesReader valuesReader,
int typeWidth,
byte[] byteArray) {
// 8 bytes (time of day nanos) + 4 bytes(julianDay) = 12 bytes
ByteBuffer buffer = valuesReader.getBuffer(12).order(ByteOrder.LITTLE_ENDIAN);
long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer);
vector.getDataBuffer().setLong((long) idx * typeWidth, timestampInt96);
}

@Override
protected void nextDictEncodedVal(
FieldVector vector,
int idx,
VectorizedDictionaryEncodedParquetValuesReader reader,
int numValuesToRead,
Dictionary dict,
NullabilityHolder nullabilityHolder,
int typeWidth,
Mode mode) {
if (Mode.RLE.equals(mode)) {
reader
.timestampInt96DictEncodedReader()
.nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, typeWidth);
} else if (Mode.PACKED.equals(mode)) {
ByteBuffer buffer =
dict.decodeToBinary(reader.readInteger()).toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer);
vector.getDataBuffer().setLong(idx, timestampInt96);
}
}
}

class FixedWidthBinaryReader extends BaseReader {
@Override
protected void nextVal(
Expand Down Expand Up @@ -777,6 +816,10 @@ TimestampMillisReader timestampMillisReader() {
return new TimestampMillisReader();
}

TimestampInt96Reader timestampInt96Reader() {
return new TimestampInt96Reader();
}

FixedWidthBinaryReader fixedWidthBinaryReader() {
return new FixedWidthBinaryReader();
}
Expand Down
16 changes: 16 additions & 0 deletions parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -68,6 +69,8 @@ public class ParquetUtil {
// not meant to be instantiated
private ParquetUtil() {}

private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

public static Metrics fileMetrics(InputFile file, MetricsConfig metricsConfig) {
return fileMetrics(file, metricsConfig, null);
}
Expand Down Expand Up @@ -403,4 +406,17 @@ public static boolean isIntType(PrimitiveType primitiveType) {
}
return primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32;
}

/**
* Method to read timestamp (parquet Int96) from bytebuffer. Read 12 bytes in byteBuffer: 8 bytes
* (time of day nanos) + 4 bytes(julianDay)
*/
public static long extractTimestampInt96(ByteBuffer buffer) {
// 8 bytes (time of day nanos)
long timeOfDayNanos = buffer.getLong();
// 4 bytes(julianDay)
int julianDay = buffer.getInt();
return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
+ TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader;
Expand Down Expand Up @@ -377,7 +377,6 @@ public long readLong() {
}

private static class TimestampInt96Reader extends UnboxedReader<Long> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
Expand All @@ -392,11 +391,7 @@ public Long read(Long ignored) {
public long readLong() {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
+ TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
return ParquetUtil.extractTimestampInt96(byteBuffer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader;
Expand Down Expand Up @@ -378,7 +378,6 @@ public long readLong() {
}

private static class TimestampInt96Reader extends UnboxedReader<Long> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
Expand All @@ -393,11 +392,7 @@ public Long read(Long ignored) {
public long readLong() {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
+ TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
return ParquetUtil.extractTimestampInt96(byteBuffer);
}
}

Expand Down
Loading