Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@ public static DecodedTimestamp decodeInt96Timestamp(Binary timestampBinary)

// little endian encoding - need to invert byte order
long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
verify(timeOfDayNanos >= 0 && timeOfDayNanos < NANOSECONDS_PER_DAY, "Invalid timeOfDayNanos: %s", timeOfDayNanos);
int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]);

return decodeInt96Timestamp(timeOfDayNanos, julianDay);
}

public static DecodedTimestamp decodeInt96Timestamp(long timeOfDayNanos, int julianDay)
{
verify(timeOfDayNanos >= 0 && timeOfDayNanos < NANOSECONDS_PER_DAY, "Invalid timeOfDayNanos: %s", timeOfDayNanos);
long epochSeconds = (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * SECONDS_PER_DAY + timeOfDayNanos / NANOSECONDS_PER_SECOND;
return new DecodedTimestamp(epochSeconds, (int) (timeOfDayNanos % NANOSECONDS_PER_SECOND));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,28 @@
package io.trino.parquet.dictionary;

import io.trino.parquet.DictionaryPage;
import org.apache.parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader;

import java.io.IOException;
import io.trino.parquet.reader.SimpleSliceInputStream;
import io.trino.parquet.reader.decoders.ValueDecoder;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.trino.parquet.ParquetReaderUtils.toInputStream;
import static io.trino.parquet.reader.decoders.PlainValueDecoders.LongPlainValueDecoder;

public class DoubleDictionary
implements Dictionary
{
private final double[] content;

public DoubleDictionary(DictionaryPage dictionaryPage)
throws IOException
{
content = new double[dictionaryPage.getDictionarySize()];
DoublePlainValuesReader doubleReader = new DoublePlainValuesReader();
doubleReader.initFromPage(dictionaryPage.getDictionarySize(), toInputStream(dictionaryPage));
for (int i = 0; i < content.length; i++) {
content[i] = doubleReader.readDouble();
int length = dictionaryPage.getDictionarySize();
long[] buffer = new long[length];
ValueDecoder<long[]> doubleReader = new LongPlainValueDecoder();
doubleReader.init(new SimpleSliceInputStream(dictionaryPage.getSlice()));
doubleReader.read(buffer, 0, length);

content = new double[length];
for (int i = 0; i < length; i++) {
content[i] = Double.longBitsToDouble(buffer[i]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,28 @@
package io.trino.parquet.dictionary;

import io.trino.parquet.DictionaryPage;
import org.apache.parquet.column.values.plain.PlainValuesReader.FloatPlainValuesReader;

import java.io.IOException;
import io.trino.parquet.reader.SimpleSliceInputStream;
import io.trino.parquet.reader.decoders.ValueDecoder;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.trino.parquet.ParquetReaderUtils.toInputStream;
import static io.trino.parquet.reader.decoders.PlainValueDecoders.IntPlainValueDecoder;

public class FloatDictionary
implements Dictionary
{
private final float[] content;

public FloatDictionary(DictionaryPage dictionaryPage)
throws IOException
{
content = new float[dictionaryPage.getDictionarySize()];
FloatPlainValuesReader floatReader = new FloatPlainValuesReader();
floatReader.initFromPage(dictionaryPage.getDictionarySize(), toInputStream(dictionaryPage));
for (int i = 0; i < content.length; i++) {
content[i] = floatReader.readFloat();
int length = dictionaryPage.getDictionarySize();
int[] buffer = new int[length];
ValueDecoder<int[]> floatReader = new IntPlainValueDecoder();
floatReader.init(new SimpleSliceInputStream(dictionaryPage.getSlice()));
floatReader.read(buffer, 0, length);

content = new float[length];
for (int i = 0; i < length; i++) {
content[i] = Float.intBitsToFloat(buffer[i]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.parquet.reader.decoders;

import io.trino.parquet.reader.SimpleSliceInputStream;
import io.trino.plugin.base.type.DecodedTimestamp;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;

Expand All @@ -23,8 +22,6 @@
import java.nio.ByteBuffer;

import static io.trino.parquet.ParquetReaderUtils.castToByte;
import static io.trino.parquet.ParquetTimestampUtils.decodeInt96Timestamp;
import static io.trino.parquet.reader.flat.Int96ColumnAdapter.Int96Buffer;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -73,49 +70,4 @@ public void skip(int n)
delegate.skip(n);
}
}

public static final class Int96ApacheParquetValueDecoder
implements ValueDecoder<Int96Buffer>
{
private final ValuesReader delegate;

public Int96ApacheParquetValueDecoder(ValuesReader delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}

@Override
public void init(SimpleSliceInputStream input)
{
initialize(input, delegate);
}

@Override
public void read(Int96Buffer values, int offset, int length)
{
int endOffset = offset + length;
for (int i = offset; i < endOffset; i++) {
DecodedTimestamp decodedTimestamp = decodeInt96Timestamp(delegate.readBytes());
values.longs[i] = decodedTimestamp.epochSeconds();
values.ints[i] = decodedTimestamp.nanosOfSecond();
}
}

@Override
public void skip(int n)
{
delegate.skip(n);
}
}

private static void initialize(SimpleSliceInputStream input, ValuesReader reader)
{
byte[] buffer = input.readBytes();
try {
reader.initFromPage(0, ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, 0, buffer.length)));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@
import io.airlift.slice.Slices;
import io.trino.parquet.reader.SimpleSliceInputStream;
import io.trino.parquet.reader.flat.BitPackingUtils;
import io.trino.plugin.base.type.DecodedTimestamp;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.Int128;
import org.apache.parquet.column.ColumnDescriptor;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.SizeOf.SIZE_OF_INT;
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;
import static io.trino.parquet.ParquetReaderUtils.toByteExact;
import static io.trino.parquet.ParquetReaderUtils.toShortExact;
import static io.trino.parquet.ParquetTimestampUtils.decodeInt96Timestamp;
import static io.trino.parquet.ParquetTypeUtils.checkBytesFitInShortDecimal;
import static io.trino.parquet.ParquetTypeUtils.getShortDecimalValue;
import static io.trino.parquet.reader.flat.BitPackingUtils.unpack;
import static io.trino.parquet.reader.flat.Int96ColumnAdapter.Int96Buffer;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
Expand Down Expand Up @@ -334,4 +339,36 @@ public void skip(int n)
input.skip(n * UUID_SIZE);
}
}

public static final class Int96PlainValueDecoder
implements ValueDecoder<Int96Buffer>
{
private static final int LENGTH = SIZE_OF_LONG + SIZE_OF_INT;

private SimpleSliceInputStream input;

@Override
public void init(SimpleSliceInputStream input)
{
this.input = requireNonNull(input, "input is null");
}

@Override
public void read(Int96Buffer values, int offset, int length)
{
input.ensureBytesAvailable(length * LENGTH);
for (int i = offset; i < offset + length; i++) {
DecodedTimestamp timestamp = decodeInt96Timestamp(input.readLongUnsafe(), input.readIntUnsafe());

values.longs[i] = timestamp.epochSeconds();
values.ints[i] = timestamp.nanosOfSecond();
}
}

@Override
public void skip(int n)
{
input.skip(n * LENGTH);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.parquet.reader.decoders;

import io.trino.parquet.reader.SimpleSliceInputStream;
import io.trino.parquet.reader.flat.NullsDecoder;

/**
* Decoder for RLE encoded values of BOOLEAN primitive type
* <a href="https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3">
* Run Length Encoding / Bit-Packing Hybrid (RLE)
* </a>
*/
public final class RleBitPackingHybridBooleanDecoder
implements ValueDecoder<byte[]>
{
private NullsDecoder decoder;

@Override
public void init(SimpleSliceInputStream input)
{
// First int is size in bytes which is not needed here
input.skip(Integer.BYTES);
this.decoder = new NullsDecoder(input.asSlice());
}

@Override
public void read(byte[] values, int offset, int length)
{
boolean[] buffer = new boolean[length];
decoder.readNext(buffer, 0, length);
for (int i = 0; i < length; i++) {
// NullsDecoder returns false for 1 (non-null) and true for 0 (null)
values[offset + i] = buffer[i] ? (byte) 0 : (byte) 1;
}
}

@Override
public void skip(int n)
{
decoder.skip(n);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import static io.trino.parquet.ParquetEncoding.PLAIN;
import static io.trino.parquet.ValuesType.VALUES;
import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.BooleanApacheParquetValueDecoder;
import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.Int96ApacheParquetValueDecoder;
import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedByteDecoder;
import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedIntDecoder;
import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedLongDecoder;
Expand All @@ -45,6 +44,7 @@
import static io.trino.parquet.reader.decoders.PlainByteArrayDecoders.BoundedVarcharPlainValueDecoder;
import static io.trino.parquet.reader.decoders.PlainByteArrayDecoders.CharPlainValueDecoder;
import static io.trino.parquet.reader.decoders.PlainValueDecoders.BooleanPlainValueDecoder;
import static io.trino.parquet.reader.decoders.PlainValueDecoders.Int96PlainValueDecoder;
import static io.trino.parquet.reader.decoders.PlainValueDecoders.IntPlainValueDecoder;
import static io.trino.parquet.reader.decoders.PlainValueDecoders.IntToBytePlainValueDecoder;
import static io.trino.parquet.reader.decoders.PlainValueDecoders.IntToShortPlainValueDecoder;
Expand Down Expand Up @@ -165,15 +165,21 @@ public static ValueDecoder<byte[]> getBooleanDecoder(ParquetEncoding encoding, P
{
return switch (encoding) {
case PLAIN -> new BooleanPlainValueDecoder();
case RLE, BIT_PACKED -> new BooleanApacheParquetValueDecoder(getApacheParquetReader(encoding, field));
case RLE -> new RleBitPackingHybridBooleanDecoder();
// BIT_PACKED is a deprecated encoding which should not be used anymore as per
// https://github.com/apache/parquet-format/blob/master/Encodings.md#bit-packed-deprecated-bit_packed--4
// An unoptimized decoder for this encoding is provided here for compatibility with old files or non-compliant writers
case BIT_PACKED -> new BooleanApacheParquetValueDecoder(getApacheParquetReader(encoding, field));
default -> throw wrongEncoding(encoding, field);
};
}

public static ValueDecoder<Int96Buffer> getInt96Decoder(ParquetEncoding encoding, PrimitiveField field)
{
if (PLAIN.equals(encoding)) {
return new Int96ApacheParquetValueDecoder(getApacheParquetReader(encoding, field));
// INT96 type has been deprecated as per https://github.com/apache/parquet-format/blob/master/Encodings.md#plain-plain--0
// However, this encoding is still commonly encountered in parquet files.
return new Int96PlainValueDecoder();
}
throw wrongEncoding(encoding, field);
}
Expand Down
Loading