diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java index 5ce1b0b121832..d71f243d83f9e 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java @@ -76,6 +76,7 @@ import static com.facebook.presto.common.type.DateType.DATE; import static com.facebook.presto.common.type.DecimalType.createDecimalType; import static com.facebook.presto.common.type.Decimals.MAX_PRECISION; +import static com.facebook.presto.common.type.Decimals.MAX_SHORT_PRECISION; import static com.facebook.presto.common.type.DoubleType.DOUBLE; import static com.facebook.presto.common.type.IntegerType.INTEGER; import static com.facebook.presto.common.type.RealType.REAL; @@ -938,24 +939,66 @@ public void testDecimalBackedByINT64() } } + private void testDecimal(int precision, int scale, Optional parquetSchema) + throws Exception + { + ContiguousSet values = bigIntegersBetween(BigDecimal.valueOf(Math.pow(10, precision - 1)).toBigInteger(), BigDecimal.valueOf(Math.pow(10, precision)).toBigInteger()); + ImmutableList.Builder expectedValues = new ImmutableList.Builder<>(); + ImmutableList.Builder writeValues = new ImmutableList.Builder<>(); + for (BigInteger value : limit(values, 1_000)) { + writeValues.add(HiveDecimal.create(value, scale)); + expectedValues.add(new SqlDecimal(value, precision, scale)); + } + tester.testRoundTrip(new JavaHiveDecimalObjectInspector(new DecimalTypeInfo(precision, scale)), + writeValues.build(), + expectedValues.build(), + createDecimalType(precision, scale), + parquetSchema); + } + + @Test + public void testShortDecimalBackedByFixedLenByteArray() + throws Exception + { + int[] scales = {0, 0, 2, 2, 4, 5, 0, 1, 5, 7, 4, 8, 4, 4, 13, 11, 16, 15}; + for (int precision = 1; precision <= MAX_SHORT_PRECISION; precision++) { + int scale = scales[precision - 1]; + testDecimal(precision, scale, Optional.empty()); + } + } + @Test - public void testDecimalBackedByFixedLenByteArray() + public void testLongDecimalBackedByFixedLenByteArray() throws Exception { int[] scales = {7, 13, 14, 8, 16, 20, 8, 4, 19, 25, 15, 23, 17, 2, 23, 0, 33, 8, 3, 12}; for (int precision = MAX_PRECISION_INT64 + 1; precision < MAX_PRECISION; precision++) { int scale = scales[precision - MAX_PRECISION_INT64 - 1]; - ContiguousSet values = bigIntegersBetween(BigDecimal.valueOf(Math.pow(10, precision - 1)).toBigInteger(), BigDecimal.valueOf(Math.pow(10, precision)).toBigInteger()); - ImmutableList.Builder expectedValues = new ImmutableList.Builder<>(); - ImmutableList.Builder writeValues = new ImmutableList.Builder<>(); - for (BigInteger value : limit(values, 1_000)) { - writeValues.add(HiveDecimal.create(value, scale)); - expectedValues.add(new SqlDecimal(value, precision, scale)); - } - tester.testRoundTrip(new JavaHiveDecimalObjectInspector(new DecimalTypeInfo(precision, scale)), - writeValues.build(), - expectedValues.build(), - createDecimalType(precision, scale)); + testDecimal(precision, scale, Optional.empty()); + } + } + + @Test + public void testShortDecimalBackedByBinary() + throws Exception + { + int[] scales = {0, 0, 1, 2, 5, 4, 3, 4, 7, 6, 8, 9, 10, 1, 13, 11, 16, 15}; + for (int precision = 1; precision <= MAX_SHORT_PRECISION; precision++) { + int scale = scales[precision - 1]; + MessageType parquetSchema = parseMessageType(format("message hive_decimal { optional BINARY test (DECIMAL(%d, %d)); }", precision, scale)); + testDecimal(precision, scale, Optional.of(parquetSchema)); + } + } + + @Test + public void testLongDecimalBackedByBinary() + throws Exception + { + int[] scales = {1, 1, 7, 8, 22, 3, 15, 14, 7, 21, 6, 12, 1, 15, 14, 29, 17, 7, 26}; + for (int precision = MAX_PRECISION_INT64 + 1; precision < MAX_PRECISION; precision++) { + int scale = scales[precision - MAX_PRECISION_INT64 - 1]; + MessageType parquetSchema = parseMessageType(format("message hive_decimal { optional BINARY test (DECIMAL(%d, %d)); }", precision, scale)); + testDecimal(precision, scale, Optional.of(parquetSchema)); } } diff --git a/presto-parquet/pom.xml b/presto-parquet/pom.xml index dcb9007b4ef53..b7da804842929 100644 --- a/presto-parquet/pom.xml +++ b/presto-parquet/pom.xml @@ -35,6 +35,11 @@ units + + com.facebook.airlift + log + + io.airlift aircompressor diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/ColumnReaderFactory.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/ColumnReaderFactory.java index 5145f32a27bf3..8ba5226b8375a 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/ColumnReaderFactory.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/ColumnReaderFactory.java @@ -13,8 +13,7 @@ */ package com.facebook.presto.parquet; -import com.facebook.presto.common.type.DecimalType; -import com.facebook.presto.common.type.Type; +import com.facebook.airlift.log.Logger; import com.facebook.presto.parquet.batchreader.BinaryFlatBatchReader; import com.facebook.presto.parquet.batchreader.BinaryNestedBatchReader; import com.facebook.presto.parquet.batchreader.BooleanFlatBatchReader; @@ -25,6 +24,8 @@ import com.facebook.presto.parquet.batchreader.Int64NestedBatchReader; import com.facebook.presto.parquet.batchreader.Int64TimestampMicrosFlatBatchReader; import com.facebook.presto.parquet.batchreader.Int64TimestampMicrosNestedBatchReader; +import com.facebook.presto.parquet.batchreader.LongDecimalFlatBatchReader; +import com.facebook.presto.parquet.batchreader.ShortDecimalFlatBatchReader; import com.facebook.presto.parquet.batchreader.TimestampFlatBatchReader; import com.facebook.presto.parquet.batchreader.TimestampNestedBatchReader; import com.facebook.presto.parquet.reader.AbstractColumnReader; @@ -40,43 +41,67 @@ import com.facebook.presto.parquet.reader.ShortDecimalColumnReader; import com.facebook.presto.parquet.reader.TimestampColumnReader; import com.facebook.presto.spi.PrestoException; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import java.util.Optional; -import static com.facebook.presto.parquet.ParquetTypeUtils.createDecimalType; +import static com.facebook.presto.parquet.ParquetTypeUtils.isDecimalType; +import static com.facebook.presto.parquet.ParquetTypeUtils.isShortDecimalType; import static com.facebook.presto.parquet.ParquetTypeUtils.isTimeStampMicrosType; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; -import static org.apache.parquet.schema.OriginalType.DECIMAL; import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS; import static org.apache.parquet.schema.OriginalType.TIME_MICROS; public class ColumnReaderFactory { + private static final Logger log = Logger.get(ColumnReaderFactory.class); private ColumnReaderFactory() { } public static ColumnReader createReader(RichColumnDescriptor descriptor, boolean batchReadEnabled) { - // decimal is not supported in batch readers - if (batchReadEnabled && descriptor.getPrimitiveType().getOriginalType() != DECIMAL) { + if (batchReadEnabled) { final boolean isNested = descriptor.getPath().length > 1; switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) { case BOOLEAN: return isNested ? new BooleanNestedBatchReader(descriptor) : new BooleanFlatBatchReader(descriptor); case INT32: + if (!isNested && isShortDecimalType(descriptor)) { + return new ShortDecimalFlatBatchReader(descriptor); + } case FLOAT: return isNested ? new Int32NestedBatchReader(descriptor) : new Int32FlatBatchReader(descriptor); case INT64: if (isTimeStampMicrosType(descriptor)) { return isNested ? new Int64TimestampMicrosNestedBatchReader(descriptor) : new Int64TimestampMicrosFlatBatchReader(descriptor); } + + if (!isNested && isShortDecimalType(descriptor)) { + int precision = ((DecimalLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation()).getPrecision(); + if (precision < 10) { + log.warn("PrimitiveTypeName is INT64 but precision is less then 10."); + } + return new ShortDecimalFlatBatchReader(descriptor); + } case DOUBLE: return isNested ? new Int64NestedBatchReader(descriptor) : new Int64FlatBatchReader(descriptor); case INT96: return isNested ? new TimestampNestedBatchReader(descriptor) : new TimestampFlatBatchReader(descriptor); case BINARY: + Optional decimalBatchColumnReader = createDecimalBatchColumnReader(descriptor); + if (decimalBatchColumnReader.isPresent()) { + return decimalBatchColumnReader.get(); + } + return isNested ? new BinaryNestedBatchReader(descriptor) : new BinaryFlatBatchReader(descriptor); + case FIXED_LEN_BYTE_ARRAY: + if (!isNested) { + decimalBatchColumnReader = createDecimalBatchColumnReader(descriptor); + if (decimalBatchColumnReader.isPresent()) { + return decimalBatchColumnReader.get(); + } + } } } @@ -109,17 +134,24 @@ public static ColumnReader createReader(RichColumnDescriptor descriptor, boolean } } + private static Optional createDecimalBatchColumnReader(RichColumnDescriptor descriptor) + { + if (isDecimalType(descriptor)) { + if (isShortDecimalType(descriptor)) { + return Optional.of(new ShortDecimalFlatBatchReader(descriptor)); + } + return Optional.of(new LongDecimalFlatBatchReader(descriptor)); + } + return Optional.empty(); + } + private static Optional createDecimalColumnReader(RichColumnDescriptor descriptor) { - Optional type = createDecimalType(descriptor); - if (type.isPresent()) { - DecimalType decimalType = (DecimalType) type.get(); - if (decimalType.isShort()) { + if (isDecimalType(descriptor)) { + if (isShortDecimalType(descriptor)) { return Optional.of(new ShortDecimalColumnReader(descriptor)); } - else { - return Optional.of(new LongDecimalColumnReader(descriptor)); - } + return Optional.of(new LongDecimalColumnReader(descriptor)); } return Optional.empty(); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java index f30eb1961b52d..6ae1d1007f0c0 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java @@ -14,8 +14,6 @@ package com.facebook.presto.parquet; import com.facebook.presto.common.Subfield; -import com.facebook.presto.common.type.DecimalType; -import com.facebook.presto.common.type.Type; import com.google.common.collect.ImmutableList; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; @@ -26,8 +24,9 @@ import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.PrimitiveColumnIO; -import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import java.util.Arrays; @@ -37,6 +36,7 @@ import java.util.Map; import java.util.Optional; +import static com.facebook.presto.common.type.Decimals.MAX_SHORT_PRECISION; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Iterables.getOnlyElement; import static java.util.stream.Collectors.joining; @@ -229,19 +229,6 @@ public static ColumnIO lookupColumnByName(GroupColumnIO groupColumnIO, String co return null; } - public static Optional createDecimalType(RichColumnDescriptor descriptor) - { - if (descriptor.getPrimitiveType().getOriginalType() != DECIMAL) { - return Optional.empty(); - } - return Optional.of(createDecimalType(descriptor.getPrimitiveType().getDecimalMetadata())); - } - - private static Type createDecimalType(DecimalMetadata decimalMetadata) - { - return DecimalType.createDecimalType(decimalMetadata.getPrecision(), decimalMetadata.getScale()); - } - /** * For optional fields: * definitionLevel == maxDefinitionLevel => Value is defined @@ -253,20 +240,33 @@ public static boolean isValueNull(boolean required, int definitionLevel, int max return !required && (definitionLevel == maxDefinitionLevel - 1); } - // copied from presto-hive DecimalUtils public static long getShortDecimalValue(byte[] bytes) { - long value = 0; - if ((bytes[0] & 0x80) != 0) { - for (int i = 0; i < 8 - bytes.length; ++i) { - value |= 0xFFL << (8 * (7 - i)); - } - } + return getShortDecimalValue(bytes, 0, bytes.length); + } - for (int i = 0; i < bytes.length; i++) { - value |= ((long) bytes[bytes.length - i - 1] & 0xFFL) << (8 * i); + public static long getShortDecimalValue(byte[] bytes, int startOffset, int length) + { + long value = 0; + switch (length) { + case 8: + value |= bytes[startOffset + 7] & 0xFFL; + case 7: + value |= (bytes[startOffset + 6] & 0xFFL) << 8; + case 6: + value |= (bytes[startOffset + 5] & 0xFFL) << 16; + case 5: + value |= (bytes[startOffset + 4] & 0xFFL) << 24; + case 4: + value |= (bytes[startOffset + 3] & 0xFFL) << 32; + case 3: + value |= (bytes[startOffset + 2] & 0xFFL) << 40; + case 2: + value |= (bytes[startOffset + 1] & 0xFFL) << 48; + case 1: + value |= (bytes[startOffset] & 0xFFL) << 56; } - + value = value >> ((8 - length) * 8); return value; } @@ -335,4 +335,20 @@ public static boolean isTimeStampMicrosType(ColumnDescriptor descriptor) { return TIMESTAMP_MICROS.equals(descriptor.getPrimitiveType().getOriginalType()); } + + public static boolean isShortDecimalType(ColumnDescriptor descriptor) + { + LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); + if (!(logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation)) { + return false; + } + + DecimalLogicalTypeAnnotation decimalLogicalTypeAnnotation = (DecimalLogicalTypeAnnotation) logicalTypeAnnotation; + return decimalLogicalTypeAnnotation.getPrecision() <= MAX_SHORT_PRECISION; + } + + public static boolean isDecimalType(ColumnDescriptor columnDescriptor) + { + return columnDescriptor.getPrimitiveType().getOriginalType() == DECIMAL; + } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BytesUtils.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BytesUtils.java index 0be7547b9c66f..480f0892fbd77 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BytesUtils.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/BytesUtils.java @@ -61,4 +61,9 @@ public static void unpack8Values(byte inByte, byte[] out, int outPos) out[6 + outPos] = (byte) (inByte >> 6 & 1); out[7 + outPos] = (byte) (inByte >> 7 & 1); } + + public static long propagateSignBit(long value, int bitsToPad) + { + return value << bitsToPad >> bitsToPad; + } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/SimpleSliceInputStream.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/SimpleSliceInputStream.java new file mode 100644 index 0000000000000..ebf81bcf7525a --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/SimpleSliceInputStream.java @@ -0,0 +1,167 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader; + +import io.airlift.slice.Slice; +import io.airlift.slice.UnsafeSlice; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkPositionIndexes; +import static java.util.Objects.requireNonNull; + +/** + * Basic input stream based on a given Slice object. + * This is a simpler version of BasicSliceInput with a few additional methods. + *

+ * Note that methods starting with 'read' modify the underlying offset, while 'get' methods return + * value without modifying the state + */ +public final class SimpleSliceInputStream +{ + private final Slice slice; + private int offset; + + public SimpleSliceInputStream(Slice slice) + { + this(slice, 0); + } + + public SimpleSliceInputStream(Slice slice, int offset) + { + this.slice = requireNonNull(slice, "slice is null"); + checkArgument(slice.length() == 0 || slice.hasByteArray(), "SimpleSliceInputStream only supports slices backed by byte array"); + this.offset = offset; + } + + public byte readByte() + { + return slice.getByte(offset++); + } + + public short readShort() + { + short value = slice.getShort(offset); + offset += Short.BYTES; + return value; + } + + public int readInt() + { + int value = slice.getInt(offset); + offset += Integer.BYTES; + return value; + } + + public long readLong() + { + long value = slice.getLong(offset); + offset += Long.BYTES; + return value; + } + + public byte[] readBytes() + { + byte[] bytes = slice.getBytes(); + offset = slice.length(); + return bytes; + } + + public void readBytes(Slice destination, int destinationIndex, int length) + { + slice.getBytes(offset, destination, destinationIndex, length); + offset += length; + } + + public void skip(int n) + { + offset += n; + } + + public Slice asSlice() + { + return slice.slice(offset, slice.length() - offset); + } + + /** + * Returns the byte array wrapped by this Slice. + * Callers should take care to use {@link SimpleSliceInputStream#getByteArrayOffset()} + * since the contents of this Slice may not start at array index 0. + */ + public byte[] getByteArray() + { + return slice.byteArray(); + } + + /** + * Returns the start index the content of this slice within the byte array wrapped by this slice. + */ + public int getByteArrayOffset() + { + return offset + slice.byteArrayOffset(); + } + + public void ensureBytesAvailable(int bytes) + { + checkPositionIndexes(offset, offset + bytes, slice.length()); + } + + /** + * Always check if needed data is available with ensureBytesAvailable method. + * Failing to do so may result in instant JVM crash. + */ + public int readIntUnsafe() + { + int value = UnsafeSlice.getIntUnchecked(slice, offset); + offset += Integer.BYTES; + return value; + } + + /** + * Always check if needed data is available with ensureBytesAvailable method. + * Failing to do so may result in instant JVM crash. + */ + public long readLongUnsafe() + { + long value = UnsafeSlice.getLongUnchecked(slice, offset); + offset += Long.BYTES; + return value; + } + + /** + * Always check if needed data is available with ensureBytesAvailable method. + * Failing to do so may result in instant JVM crash. + */ + public byte getByteUnsafe(int index) + { + return UnsafeSlice.getByteUnchecked(slice, offset + index); + } + + /** + * Always check if needed data is available with ensureBytesAvailable method. + * Failing to do so may result in instant JVM crash. + */ + public int getIntUnsafe(int index) + { + return UnsafeSlice.getIntUnchecked(slice, offset + index); + } + + /** + * Always check if needed data is available with ensureBytesAvailable method. + * Failing to do so may result in instant JVM crash. + */ + public long getLongUnsafe(int index) + { + return UnsafeSlice.getLongUnchecked(slice, offset + index); + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java index c9254c28a7752..d806c1befe682 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java @@ -19,13 +19,25 @@ import com.facebook.presto.parquet.ParquetEncoding; import com.facebook.presto.parquet.RichColumnDescriptor; import com.facebook.presto.parquet.batchreader.decoders.delta.BinaryDeltaValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.delta.BinaryLongDecimalDeltaValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.delta.BinaryShortDecimalDeltaValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.delta.FixedLenByteArrayLongDecimalDeltaValueDecoder; +import com.facebook.presto.parquet.batchreader.decoders.delta.FixedLenByteArrayShortDecimalDeltaValueDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.Int32DeltaBinaryPackedValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.delta.Int32ShortDecimalDeltaValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.Int64DeltaBinaryPackedValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.delta.Int64ShortDecimalDeltaValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.delta.Int64TimestampMicrosDeltaBinaryPackedValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.plain.BinaryLongDecimalPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.BinaryPlainValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.plain.BinaryShortDecimalPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.BooleanPlainValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.plain.FixedLenByteArrayLongDecimalPlainValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.plain.FixedLenByteArrayShortDecimalPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int32PlainValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.plain.Int32ShortDecimalPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int64PlainValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.plain.Int64ShortDecimalPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.Int64TimestampMicrosPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.plain.TimestampPlainValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.BinaryRLEDictionaryValuesDecoder; @@ -33,6 +45,8 @@ import com.facebook.presto.parquet.batchreader.decoders.rle.Int32RLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.Int64RLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.Int64TimestampMicrosRLEDictionaryValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.rle.LongDecimalRLEDictionaryValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.rle.ShortDecimalRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.TimestampRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.dictionary.BinaryBatchDictionary; import com.facebook.presto.parquet.batchreader.dictionary.TimestampDictionary; @@ -43,6 +57,7 @@ import io.airlift.slice.Slice; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import java.io.ByteArrayInputStream; @@ -61,7 +76,10 @@ import static com.facebook.presto.parquet.ParquetErrorCode.PARQUET_IO_READ_ERROR; import static com.facebook.presto.parquet.ParquetErrorCode.PARQUET_UNSUPPORTED_COLUMN_TYPE; import static com.facebook.presto.parquet.ParquetErrorCode.PARQUET_UNSUPPORTED_ENCODING; +import static com.facebook.presto.parquet.ParquetTypeUtils.isDecimalType; +import static com.facebook.presto.parquet.ParquetTypeUtils.isShortDecimalType; import static com.facebook.presto.parquet.ParquetTypeUtils.isTimeStampMicrosType; +import static com.facebook.presto.parquet.ValuesType.VALUES; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.String.format; import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; @@ -98,20 +116,41 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript case BOOLEAN: return new BooleanPlainValuesDecoder(buffer, offset, length); case INT32: + if (isShortDecimalType(columnDescriptor)) { + return new Int32ShortDecimalPlainValuesDecoder(buffer, offset, length); + } case FLOAT: return new Int32PlainValuesDecoder(buffer, offset, length); case INT64: { if (isTimeStampMicrosType(columnDescriptor)) { return new Int64TimestampMicrosPlainValuesDecoder(buffer, offset, length); } + + if (isShortDecimalType(columnDescriptor)) { + return new Int64ShortDecimalPlainValuesDecoder(buffer, offset, length); + } } case DOUBLE: return new Int64PlainValuesDecoder(buffer, offset, length); case INT96: return new TimestampPlainValuesDecoder(buffer, offset, length); case BINARY: + if (isDecimalType(columnDescriptor)) { + if (isShortDecimalType(columnDescriptor)) { + return new BinaryShortDecimalPlainValuesDecoder(buffer, offset, length); + } + return new BinaryLongDecimalPlainValuesDecoder(buffer, offset, length); + } return new BinaryPlainValuesDecoder(buffer, offset, length); case FIXED_LEN_BYTE_ARRAY: + if (isDecimalType(columnDescriptor)) { + if (isShortDecimalType(columnDescriptor)) { + return new FixedLenByteArrayShortDecimalPlainValuesDecoder(columnDescriptor, buffer, offset, length); + } + + int typeLength = columnDescriptor.getPrimitiveType().getTypeLength(); + return new FixedLenByteArrayLongDecimalPlainValuesDecoder(typeLength, buffer, offset, length); + } default: throw new PrestoException(PARQUET_UNSUPPORTED_COLUMN_TYPE, format("Column: %s, Encoding: %s", columnDescriptor, encoding)); } @@ -146,6 +185,12 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript return new BinaryRLEDictionaryValuesDecoder(bitWidth, inputStream, (BinaryBatchDictionary) dictionary); } case FIXED_LEN_BYTE_ARRAY: + if (isDecimalType(columnDescriptor)) { + if (isShortDecimalType(columnDescriptor)) { + return new ShortDecimalRLEDictionaryValuesDecoder(bitWidth, inputStream, (BinaryBatchDictionary) dictionary); + } + return new LongDecimalRLEDictionaryValuesDecoder(bitWidth, inputStream, (BinaryBatchDictionary) dictionary); + } default: throw new PrestoException(PARQUET_UNSUPPORTED_COLUMN_TYPE, format("Column: %s, Encoding: %s", columnDescriptor, encoding)); } @@ -155,6 +200,10 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript ByteBufferInputStream inputStream = ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, offset, length)); switch (type) { case INT32: + if (isShortDecimalType(columnDescriptor)) { + ValuesReader parquetReader = getParquetReader(encoding, columnDescriptor, valueCount, inputStream); + return new Int32ShortDecimalDeltaValuesDecoder(parquetReader); + } case FLOAT: { return new Int32DeltaBinaryPackedValuesDecoder(valueCount, inputStream); } @@ -162,6 +211,11 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript if (isTimeStampMicrosType(columnDescriptor)) { return new Int64TimestampMicrosDeltaBinaryPackedValuesDecoder(valueCount, inputStream); } + + if (isShortDecimalType(columnDescriptor)) { + ValuesReader parquetReader = getParquetReader(encoding, columnDescriptor, valueCount, inputStream); + return new Int64ShortDecimalDeltaValuesDecoder(parquetReader); + } } case DOUBLE: { return new Int64DeltaBinaryPackedValuesDecoder(valueCount, inputStream); @@ -173,11 +227,40 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript if ((encoding == DELTA_BYTE_ARRAY || encoding == DELTA_LENGTH_BYTE_ARRAY) && type == PrimitiveTypeName.BINARY) { ByteBufferInputStream inputStream = ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, offset, length)); + if (isDecimalType(columnDescriptor)) { + if (isShortDecimalType(columnDescriptor)) { + return new BinaryShortDecimalDeltaValuesDecoder(encoding, valueCount, inputStream); + } + + return new BinaryLongDecimalDeltaValuesDecoder(encoding, valueCount, inputStream); + } return new BinaryDeltaValuesDecoder(encoding, valueCount, inputStream); } + + if (encoding == DELTA_BYTE_ARRAY && type == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { + if (isDecimalType(columnDescriptor)) { + ByteBufferInputStream inputStream = ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, offset, length)); + ValuesReader parquetReader = getParquetReader(encoding, columnDescriptor, valueCount, inputStream); + + if (isShortDecimalType(columnDescriptor)) { + return new FixedLenByteArrayShortDecimalDeltaValueDecoder(parquetReader, columnDescriptor); + } + + return new FixedLenByteArrayLongDecimalDeltaValueDecoder(parquetReader); + } + } + throw new PrestoException(PARQUET_UNSUPPORTED_ENCODING, format("Column: %s, Encoding: %s", columnDescriptor, encoding)); } + private static ValuesReader getParquetReader(ParquetEncoding encoding, ColumnDescriptor descriptor, int valueCount, ByteBufferInputStream inputStream) + throws IOException + { + ValuesReader valuesReader = encoding.getValuesReader(descriptor, VALUES); + valuesReader.initFromPage(valueCount, inputStream); + return valuesReader; + } + private static FlatDecoders readFlatPageV1(DataPageV1 page, RichColumnDescriptor columnDescriptor, Dictionary dictionary) throws IOException { diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java index 866ec7f7304cf..1c82355ee2b02 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java @@ -82,5 +82,25 @@ interface BooleanValuesDecoder void skip(int length); } + interface ShortDecimalValuesDecoder + extends ValuesDecoder + { + void readNext(long[] values, int offset, int length) + throws IOException; + + void skip(int length) + throws IOException; + } + + interface LongDecimalValuesDecoder + extends ValuesDecoder + { + void readNext(long[] values, int offset, int length) + throws IOException; + + void skip(int length) + throws IOException; + } + public long getRetainedSizeInBytes(); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/AbstractInt64AndInt32ShortDecimalDeltaValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/AbstractInt64AndInt32ShortDecimalDeltaValuesDecoder.java new file mode 100644 index 0000000000000..76ccdc2a8a847 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/AbstractInt64AndInt32ShortDecimalDeltaValuesDecoder.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.delta; + +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder; +import org.apache.parquet.column.values.ValuesReader; +import org.openjdk.jol.info.ClassLayout; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public abstract class AbstractInt64AndInt32ShortDecimalDeltaValuesDecoder + implements ShortDecimalValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(AbstractInt64AndInt32ShortDecimalDeltaValuesDecoder.class).instanceSize(); + + protected final ValuesReader delegate; + + public AbstractInt64AndInt32ShortDecimalDeltaValuesDecoder(ValuesReader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void readNext(long[] values, int offset, int length) + { + for (int i = offset; i < offset + length; i++) { + values[i] = readData(); + } + } + + protected abstract long readData(); + + @Override + public void skip(int length) + { + checkArgument(length >= 0, "invalid length %s", length); + delegate.skip(length); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/BinaryLongDecimalDeltaValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/BinaryLongDecimalDeltaValuesDecoder.java new file mode 100644 index 0000000000000..676ca0fadcebd --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/BinaryLongDecimalDeltaValuesDecoder.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.delta; + +import com.facebook.presto.common.type.Decimals; +import com.facebook.presto.parquet.ParquetEncoding; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.LongDecimalValuesDecoder; +import io.airlift.slice.Slice; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.openjdk.jol.info.ClassLayout; + +import java.io.IOException; +import java.math.BigInteger; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static java.util.Objects.requireNonNull; + +public class BinaryLongDecimalDeltaValuesDecoder + implements LongDecimalValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(BinaryLongDecimalDeltaValuesDecoder.class).instanceSize(); + + private final BinaryDeltaValuesDecoder delegate; + + public BinaryLongDecimalDeltaValuesDecoder(ParquetEncoding encoding, int valueCount, ByteBufferInputStream bufferInputStream) + throws IOException + { + requireNonNull(encoding, "encoding is null"); + requireNonNull(bufferInputStream, "bufferInputStream is null"); + delegate = new BinaryDeltaValuesDecoder(encoding, valueCount, bufferInputStream); + } + + @Override + public void readNext(long[] values, int offset, int length) + throws IOException + { + BinaryValuesDecoder.ValueBuffer valueBuffer = delegate.readNext(length); + int bufferSize = valueBuffer.getBufferSize(); + byte[] byteBuffer = new byte[bufferSize]; + int[] offsets = new int[bufferSize + 1]; + delegate.readIntoBuffer(byteBuffer, 0, offsets, 0, valueBuffer); + + for (int i = 0; i < length; i++) { + int positionOffset = offsets[i]; + int positionLength = offsets[i + 1] - positionOffset; + byte[] temp = new byte[positionLength]; + System.arraycopy(byteBuffer, positionOffset, temp, 0, positionLength); + Slice slice = Decimals.encodeUnscaledValue(new BigInteger(temp)); + values[2 * (offset + i)] = slice.getLong(0); + values[2 * (offset + i) + 1] = slice.getLong(SIZE_OF_LONG); + } + } + + @Override + public void skip(int length) + throws IOException + { + checkArgument(length >= 0, "invalid length %s", length); + delegate.skip(length); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/BinaryShortDecimalDeltaValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/BinaryShortDecimalDeltaValuesDecoder.java new file mode 100644 index 0000000000000..71793909888ab --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/BinaryShortDecimalDeltaValuesDecoder.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.delta; + +import com.facebook.presto.parquet.ParquetEncoding; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.io.ParquetDecodingException; +import org.openjdk.jol.info.ClassLayout; + +import java.io.IOException; + +import static com.facebook.presto.parquet.ParquetTypeUtils.getShortDecimalValue; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public class BinaryShortDecimalDeltaValuesDecoder + implements ShortDecimalValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(BinaryShortDecimalDeltaValuesDecoder.class).instanceSize(); + + private final BinaryDeltaValuesDecoder delegate; + + public BinaryShortDecimalDeltaValuesDecoder(ParquetEncoding encoding, int valueCount, ByteBufferInputStream bufferInputStream) + throws IOException + { + requireNonNull(encoding, "encoding is null"); + requireNonNull(bufferInputStream, "bufferInputStream is null"); + delegate = new BinaryDeltaValuesDecoder(encoding, valueCount, bufferInputStream); + } + + @Override + public void readNext(long[] values, int offset, int length) + throws IOException + { + BinaryValuesDecoder.ValueBuffer valueBuffer = delegate.readNext(length); + int bufferSize = valueBuffer.getBufferSize(); + byte[] byteBuffer = new byte[bufferSize]; + int[] offsets = new int[bufferSize + 1]; + delegate.readIntoBuffer(byteBuffer, 0, offsets, 0, valueBuffer); + + for (int i = 0; i < length; i++) { + int positionOffset = offsets[i]; + int positionLength = offsets[i + 1] - positionOffset; + if (positionLength > 8) { + throw new ParquetDecodingException("Unable to read BINARY type decimal of size " + positionLength + " as a short decimal"); + } + + values[offset + i] = getShortDecimalValue(byteBuffer, positionOffset, positionLength); + } + } + + @Override + public void skip(int length) + throws IOException + { + checkArgument(length >= 0, "invalid length %s", length); + delegate.skip(length); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayLongDecimalDeltaValueDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayLongDecimalDeltaValueDecoder.java new file mode 100644 index 0000000000000..f0a4cdb931d09 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayLongDecimalDeltaValueDecoder.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.delta; + +import com.facebook.presto.common.type.Decimals; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.LongDecimalValuesDecoder; +import io.airlift.slice.Slice; +import org.apache.parquet.column.values.ValuesReader; +import org.openjdk.jol.info.ClassLayout; + +import java.math.BigInteger; + +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static java.util.Objects.requireNonNull; + +/** + * Note: this is not an optimized values decoder. It makes use of the existing Parquet decoder. Given that this type encoding + * is not a common one, just use the existing one provided by Parquet library and add a wrapper around it that satisfies the + * {@link LongDecimalValuesDecoder} interface. + */ +public class FixedLenByteArrayLongDecimalDeltaValueDecoder + implements LongDecimalValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(FixedLenByteArrayLongDecimalDeltaValueDecoder.class).instanceSize(); + + private final ValuesReader delegate; + + public FixedLenByteArrayLongDecimalDeltaValueDecoder(ValuesReader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void readNext(long[] values, int offset, int length) + { + int endOffset = (offset + length) * 2; + for (int currentOutputOffset = offset * 2; currentOutputOffset < endOffset; currentOutputOffset += 2) { + byte[] inputBytes = delegate.readBytes().getBytes(); + Slice slice = Decimals.encodeUnscaledValue(new BigInteger(inputBytes)); + values[currentOutputOffset] = slice.getLong(0); + values[currentOutputOffset + 1] = slice.getLong(SIZE_OF_LONG); + } + } + + @Override + public void skip(int length) + { + delegate.skip(length); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayShortDecimalDeltaValueDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayShortDecimalDeltaValueDecoder.java new file mode 100644 index 0000000000000..86b73c43d8331 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/FixedLenByteArrayShortDecimalDeltaValueDecoder.java @@ -0,0 +1,84 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.delta; + +import com.facebook.presto.common.type.Decimals; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.openjdk.jol.info.ClassLayout; + +import static com.facebook.presto.parquet.ParquetTypeUtils.getShortDecimalValue; +import static com.facebook.presto.parquet.batchreader.decoders.plain.FixedLenByteArrayShortDecimalPlainValuesDecoder.checkBytesFitInShortDecimal; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +/** + * Note: this is not an optimized values decoder. It makes use of the existing Parquet decoder. Given that this type encoding + * is not a common one, just use the existing one provided by Parquet library and add a wrapper around it that satisfies the + * {@link ShortDecimalValuesDecoder} interface. + */ +public class FixedLenByteArrayShortDecimalDeltaValueDecoder + implements ShortDecimalValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(FixedLenByteArrayShortDecimalDeltaValueDecoder.class).instanceSize(); + + private final ValuesReader delegate; + private final ColumnDescriptor descriptor; + private final int typeLength; + + public FixedLenByteArrayShortDecimalDeltaValueDecoder(ValuesReader delegate, ColumnDescriptor descriptor) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.descriptor = requireNonNull(descriptor, "descriptor is null"); + LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); + checkArgument( + logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation + && ((DecimalLogicalTypeAnnotation) logicalTypeAnnotation).getPrecision() <= Decimals.MAX_SHORT_PRECISION, + "Column %s is not a short decimal", + descriptor); + this.typeLength = descriptor.getPrimitiveType().getTypeLength(); + checkArgument(typeLength > 0 && typeLength <= 16, "Expected column %s to have type length in range (1-16)", descriptor); + } + + @Override + public void readNext(long[] values, int offset, int length) + { + int bytesOffset = 0; + int bytesLength = typeLength; + if (typeLength > Long.BYTES) { + bytesOffset = typeLength - Long.BYTES; + bytesLength = Long.BYTES; + } + for (int i = offset; i < offset + length; i++) { + byte[] bytes = delegate.readBytes().getBytes(); + checkBytesFitInShortDecimal(bytes, 0, bytesOffset, descriptor); + values[i] = getShortDecimalValue(bytes, bytesOffset, bytesLength); + } + } + + @Override + public void skip(int length) + { + delegate.skip(length); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/Int32ShortDecimalDeltaValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/Int32ShortDecimalDeltaValuesDecoder.java new file mode 100644 index 0000000000000..101d6d12917f0 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/Int32ShortDecimalDeltaValuesDecoder.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.delta; + +import org.apache.parquet.column.values.ValuesReader; + +public class Int32ShortDecimalDeltaValuesDecoder + extends AbstractInt64AndInt32ShortDecimalDeltaValuesDecoder +{ + public Int32ShortDecimalDeltaValuesDecoder(ValuesReader delegate) + { + super(delegate); + } + + @Override + protected long readData() + { + return delegate.readInteger(); + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/Int64ShortDecimalDeltaValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/Int64ShortDecimalDeltaValuesDecoder.java new file mode 100644 index 0000000000000..3fbc2bf3686b2 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/Int64ShortDecimalDeltaValuesDecoder.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.delta; + +import org.apache.parquet.column.values.ValuesReader; + +public class Int64ShortDecimalDeltaValuesDecoder + extends AbstractInt64AndInt32ShortDecimalDeltaValuesDecoder +{ + public Int64ShortDecimalDeltaValuesDecoder(ValuesReader delegate) + { + super(delegate); + } + + @Override + protected long readData() + { + return delegate.readLong(); + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/BinaryLongDecimalPlainValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/BinaryLongDecimalPlainValuesDecoder.java new file mode 100644 index 0000000000000..81f86f376c42e --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/BinaryLongDecimalPlainValuesDecoder.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.plain; + +import com.facebook.presto.common.type.Decimals; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.LongDecimalValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.plain.BinaryPlainValuesDecoder.PlainValueBuffer; +import io.airlift.slice.Slice; +import org.openjdk.jol.info.ClassLayout; + +import java.math.BigInteger; +import java.util.Arrays; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static java.util.Objects.requireNonNull; + +public class BinaryLongDecimalPlainValuesDecoder + implements LongDecimalValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(BinaryLongDecimalPlainValuesDecoder.class).instanceSize(); + + private final BinaryPlainValuesDecoder delegate; + + public BinaryLongDecimalPlainValuesDecoder(byte[] buffer, int bufOffset, int bufLength) + { + requireNonNull(buffer, "buffer is null"); + delegate = new BinaryPlainValuesDecoder(buffer, bufOffset, bufLength); + } + + @Override + public void readNext(long[] values, int offset, int length) + { + PlainValueBuffer valueBuffer = (PlainValueBuffer) delegate.readNext(length); + int bufferSize = valueBuffer.getBufferSize(); + byte[] byteBuffer = new byte[bufferSize]; + int[] offsets = new int[bufferSize + 1]; + delegate.readIntoBuffer(byteBuffer, 0, offsets, 0, valueBuffer); + + for (int i = 0; i < length; i++) { + int positionOffset = offsets[i]; + int positionLength = offsets[i + 1] - positionOffset; + byte[] temp = Arrays.copyOfRange(byteBuffer, positionOffset, positionOffset + positionLength); + Slice slice = Decimals.encodeUnscaledValue(new BigInteger(temp)); + values[2 * (offset + i)] = slice.getLong(0); + values[2 * (offset + i) + 1] = slice.getLong(SIZE_OF_LONG); + } + } + + @Override + public void skip(int length) + { + checkArgument(length >= 0, "invalid length %s", length); + delegate.skip(length); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/BinaryShortDecimalPlainValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/BinaryShortDecimalPlainValuesDecoder.java new file mode 100644 index 0000000000000..102872869857e --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/BinaryShortDecimalPlainValuesDecoder.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.plain; + +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.BinaryValuesDecoder.ValueBuffer; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder; +import org.apache.parquet.io.ParquetDecodingException; +import org.openjdk.jol.info.ClassLayout; + +import static com.facebook.presto.parquet.ParquetTypeUtils.getShortDecimalValue; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public class BinaryShortDecimalPlainValuesDecoder + implements ShortDecimalValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(BinaryShortDecimalPlainValuesDecoder.class).instanceSize(); + + private final BinaryPlainValuesDecoder delegate; + + public BinaryShortDecimalPlainValuesDecoder(byte[] byteBuffer, int bufferOffset, int length) + { + requireNonNull(byteBuffer, "buffer is null"); + delegate = new BinaryPlainValuesDecoder(byteBuffer, bufferOffset, length); + } + + @Override + public void readNext(long[] values, int offset, int length) + { + ValueBuffer valueBuffer = delegate.readNext(length); + int bufferSize = valueBuffer.getBufferSize(); + byte[] byteBuffer = new byte[bufferSize]; + int[] offsets = new int[bufferSize + 1]; + delegate.readIntoBuffer(byteBuffer, 0, offsets, 0, valueBuffer); + + for (int i = 0; i < length; i++) { + int positionOffset = offsets[i]; + int positionLength = offsets[i + 1] - positionOffset; + if (positionLength > 8) { + throw new ParquetDecodingException("Unable to read BINARY type decimal of size " + positionLength + " as a short decimal"); + } + + values[offset + i] = getShortDecimalValue(byteBuffer, positionOffset, positionLength); + } + } + + @Override + public void skip(int length) + { + checkArgument(length >= 0, "invalid length %s", length); + delegate.skip(length); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayLongDecimalPlainValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayLongDecimalPlainValuesDecoder.java new file mode 100644 index 0000000000000..08182cbeb2a1b --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayLongDecimalPlainValuesDecoder.java @@ -0,0 +1,81 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.plain; + +import com.facebook.presto.common.type.Decimals; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.LongDecimalValuesDecoder; +import io.airlift.slice.Slice; +import org.openjdk.jol.info.ClassLayout; + +import java.math.BigInteger; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static io.airlift.slice.SizeOf.sizeOf; +import static java.util.Objects.requireNonNull; + +public class FixedLenByteArrayLongDecimalPlainValuesDecoder + implements LongDecimalValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(BinaryLongDecimalPlainValuesDecoder.class).instanceSize(); + + private final int typeLength; + private final byte[] inputBytes; + private final byte[] byteBuffer; + private final int bufferEnd; + + private int bufferOffset; + + public FixedLenByteArrayLongDecimalPlainValuesDecoder(int typeLength, byte[] byteBuffer, int bufferOffset, int length) + { + checkArgument(typeLength > 0 && typeLength <= 16, "typeLength %s should be in range (1-16) for a long decimal", typeLength); + this.typeLength = typeLength; + this.inputBytes = new byte[typeLength]; + this.byteBuffer = requireNonNull(byteBuffer, "byteBuffer is null"); + this.bufferOffset = bufferOffset; + this.bufferEnd = bufferOffset + length; + } + + @Override + public void readNext(long[] values, int offset, int length) + { + int localBufferOffset = bufferOffset; + int endOffset = (offset + length) * 2; + + for (int currentOutputOffset = offset * 2; currentOutputOffset < endOffset; currentOutputOffset += 2) { + System.arraycopy(byteBuffer, localBufferOffset, inputBytes, 0, typeLength); + Slice slice = Decimals.encodeUnscaledValue(new BigInteger(inputBytes)); + values[currentOutputOffset] = slice.getLong(0); + values[currentOutputOffset + 1] = slice.getLong(SIZE_OF_LONG); + + localBufferOffset += typeLength; + } + + bufferOffset = localBufferOffset; + } + + @Override + public void skip(int length) + { + checkArgument(bufferOffset + length * typeLength <= bufferEnd, "End of stream: invalid read request"); + checkArgument(length >= 0, "invalid length %s", length); + bufferOffset += length * typeLength; + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + sizeOf(byteBuffer) + sizeOf(inputBytes); + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayShortDecimalPlainValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayShortDecimalPlainValuesDecoder.java new file mode 100644 index 0000000000000..49d2b4fb7b10c --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/FixedLenByteArrayShortDecimalPlainValuesDecoder.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.plain; + +import com.facebook.presto.parquet.batchreader.SimpleSliceInputStream; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder; +import com.facebook.presto.spi.PrestoException; +import io.airlift.slice.Slices; +import org.apache.parquet.column.ColumnDescriptor; +import org.openjdk.jol.info.ClassLayout; + +import static com.facebook.presto.parquet.ParquetTypeUtils.getShortDecimalValue; +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public class FixedLenByteArrayShortDecimalPlainValuesDecoder + implements ShortDecimalValuesDecoder +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(FixedLenByteArrayShortDecimalPlainValuesDecoder.class).instanceSize(); + + private final ColumnDescriptor columnDescriptor; + private final int typeLength; + private final ShortDecimalFixedWidthByteArrayBatchDecoder decimalValueDecoder; + private final SimpleSliceInputStream input; + + public FixedLenByteArrayShortDecimalPlainValuesDecoder(ColumnDescriptor columnDescriptor, byte[] byteBuffer, int bufferOffset, int length) + { + this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor is null"); + this.typeLength = columnDescriptor.getPrimitiveType().getTypeLength(); + checkArgument(typeLength > 0 && typeLength <= 16, "Expected column %s to have type length in range (1-16)", columnDescriptor); + this.decimalValueDecoder = new ShortDecimalFixedWidthByteArrayBatchDecoder(Math.min(typeLength, Long.BYTES)); + input = new SimpleSliceInputStream(Slices.wrappedBuffer(requireNonNull(byteBuffer, "byteBuffer is null"), bufferOffset, length)); + } + + @Override + public void readNext(long[] values, int offset, int length) + { + input.ensureBytesAvailable(typeLength * length); + if (typeLength <= Long.BYTES) { + decimalValueDecoder.getShortDecimalValues(input, values, offset, length); + return; + } + int extraBytesLength = typeLength - Long.BYTES; + byte[] inputBytes = input.getByteArray(); + int inputBytesOffset = input.getByteArrayOffset(); + for (int i = offset; i < offset + length; i++) { + checkBytesFitInShortDecimal(inputBytes, inputBytesOffset, extraBytesLength, columnDescriptor); + values[i] = getShortDecimalValue(inputBytes, inputBytesOffset + extraBytesLength, Long.BYTES); + inputBytesOffset += typeLength; + } + input.skip(length * typeLength); + } + + public static void checkBytesFitInShortDecimal(byte[] bytes, int offset, int length, ColumnDescriptor descriptor) + { + int endOffset = offset + length; + // Equivalent to expectedValue = bytes[endOffset] < 0 ? -1 : 0 + byte expectedValue = (byte) (bytes[endOffset] >> 7); + for (int i = offset; i < endOffset; i++) { + if (bytes[i] != expectedValue) { + throw new PrestoException(NOT_SUPPORTED, "Could not read unscaled value into a short decimal from column " + descriptor); + } + } + } + + @Override + public void skip(int length) + { + checkArgument(length >= 0, "invalid length %s", length); + input.skip(length * typeLength); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/Int32ShortDecimalPlainValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/Int32ShortDecimalPlainValuesDecoder.java new file mode 100644 index 0000000000000..3ecfd948a32dc --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/Int32ShortDecimalPlainValuesDecoder.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.plain; + +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder; + +public class Int32ShortDecimalPlainValuesDecoder + extends Int32PlainValuesDecoder + implements ShortDecimalValuesDecoder +{ + public Int32ShortDecimalPlainValuesDecoder(byte[] byteBuffer, int bufferOffset, int length) + { + super(byteBuffer, bufferOffset, length); + } + + @Override + public void readNext(long[] values, int offset, int length) + { + int[] tempValues = new int[length]; + super.readNext(tempValues, 0, length); + for (int i = 0; i < length; i++) { + values[offset + i] = tempValues[i]; + } + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/Int64ShortDecimalPlainValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/Int64ShortDecimalPlainValuesDecoder.java new file mode 100644 index 0000000000000..046bf288d5d96 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/Int64ShortDecimalPlainValuesDecoder.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.plain; + +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder; + +public class Int64ShortDecimalPlainValuesDecoder + extends Int64PlainValuesDecoder + implements ShortDecimalValuesDecoder +{ + public Int64ShortDecimalPlainValuesDecoder(byte[] byteBuffer, int bufferOffset, int length) + { + super(byteBuffer, bufferOffset, length); + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/ShortDecimalFixedWidthByteArrayBatchDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/ShortDecimalFixedWidthByteArrayBatchDecoder.java new file mode 100644 index 0000000000000..b8bbc54f35e86 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/ShortDecimalFixedWidthByteArrayBatchDecoder.java @@ -0,0 +1,274 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.plain; + +import com.facebook.presto.parquet.batchreader.SimpleSliceInputStream; + +import static com.facebook.presto.parquet.batchreader.BytesUtils.propagateSignBit; +import static com.google.common.base.Preconditions.checkArgument; + +public class ShortDecimalFixedWidthByteArrayBatchDecoder +{ + private static final ShortDecimalDecoder[] VALUE_DECODERS = new ShortDecimalDecoder[] { + new BigEndianReader1(), + new BigEndianReader2(), + new BigEndianReader3(), + new BigEndianReader4(), + new BigEndianReader5(), + new BigEndianReader6(), + new BigEndianReader7(), + new BigEndianReader8() + }; + + public interface ShortDecimalDecoder + { + void decode(SimpleSliceInputStream input, long[] values, int offset, int length); + } + + private final ShortDecimalDecoder decoder; + + public ShortDecimalFixedWidthByteArrayBatchDecoder(int length) + { + checkArgument( + length > 0 && length <= 8, + "Short decimal length %s must be in range 1-8", + length); + decoder = VALUE_DECODERS[length - 1]; + // Unscaled number is encoded as two's complement using big-endian byte order + // (the most significant byte is the zeroth element) + } + + /** + * This method uses Unsafe operations on Slice. + * Always check if needed data is available with ensureBytesAvailable method. + * Failing to do so may result in instant JVM crash. + */ + public void getShortDecimalValues(SimpleSliceInputStream input, long[] values, int offset, int length) + { + decoder.decode(input, values, offset, length); + } + + private static final class BigEndianReader8 + implements ShortDecimalDecoder + { + @Override + public void decode(SimpleSliceInputStream input, long[] values, int offset, int length) + { + int endOffset = offset + length; + for (int i = offset; i < endOffset; i++) { + values[i] = Long.reverseBytes(input.readLongUnsafe()); + } + } + } + + private static final class BigEndianReader7 + implements ShortDecimalDecoder + { + @Override + public void decode(SimpleSliceInputStream input, long[] values, int offset, int length) + { + int bytesOffSet = 0; + int endOffset = offset + length; + for (int i = offset; i < endOffset - 1; i++) { + // We read redundant bytes and then ignore them. Sign bit is propagated by `>>` operator + values[i] = Long.reverseBytes(input.getLongUnsafe(bytesOffSet)) >> 8; + bytesOffSet += 7; + } + // Decode the last one "normally" as it would read data out of bounds + values[endOffset - 1] = decode(input, bytesOffSet); + input.skip(bytesOffSet + 7); + } + + private long decode(SimpleSliceInputStream input, int index) + { + long value = (input.getByteUnsafe(index + 6) & 0xFFL) + | (input.getByteUnsafe(index + 5) & 0xFFL) << 8 + | (input.getByteUnsafe(index + 4) & 0xFFL) << 16 + | (Integer.reverseBytes(input.getIntUnsafe(index)) & 0xFFFFFFFFL) << 24; + return propagateSignBit(value, 8); + } + } + + private static final class BigEndianReader6 + implements ShortDecimalDecoder + { + @Override + public void decode(SimpleSliceInputStream input, long[] values, int offset, int length) + { + int bytesOffSet = 0; + int endOffset = offset + length; + for (int i = offset; i < endOffset - 1; i++) { + // We read redundant bytes and then ignore them. Sign bit is propagated by `>>` operator + values[i] = Long.reverseBytes(input.getLongUnsafe(bytesOffSet)) >> 16; + bytesOffSet += 6; + } + // Decode the last one "normally" as it would read data out of bounds + values[endOffset - 1] = decode(input, bytesOffSet); + input.skip(bytesOffSet + 6); + } + + private long decode(SimpleSliceInputStream input, int index) + { + long value = (input.getByteUnsafe(index + 5) & 0xFFL) + | (input.getByteUnsafe(index + 4) & 0xFFL) << 8 + | (Integer.reverseBytes(input.getIntUnsafe(index)) & 0xFFFFFFFFL) << 16; + return propagateSignBit(value, 16); + } + } + + private static final class BigEndianReader5 + implements ShortDecimalDecoder + { + @Override + public void decode(SimpleSliceInputStream input, long[] values, int offset, int length) + { + int bytesOffSet = 0; + int endOffset = offset + length; + for (int i = offset; i < endOffset - 1; i++) { + // We read redundant bytes and then ignore them. Sign bit is propagated by `>>` operator + values[i] = Long.reverseBytes(input.getLongUnsafe(bytesOffSet)) >> 24; + bytesOffSet += 5; + } + // Decode the last one "normally" as it would read data out of bounds + values[endOffset - 1] = decode(input, bytesOffSet); + input.skip(bytesOffSet + 5); + } + + private long decode(SimpleSliceInputStream input, int index) + { + long value = (input.getByteUnsafe(index + 4) & 0xFFL) + | (Integer.reverseBytes(input.getIntUnsafe(index)) & 0xFFFFFFFFL) << 8; + return propagateSignBit(value, 24); + } + } + + private static final class BigEndianReader4 + implements ShortDecimalDecoder + { + @Override + public void decode(SimpleSliceInputStream input, long[] values, int offset, int length) + { + while (length > 1) { + long value = Long.reverseBytes(input.readLongUnsafe()); + + // Implicit cast will propagate the sign bit correctly, as it is performed after the byte reversal. + values[offset] = (int) (value >> 32); + values[offset + 1] = (int) value; + + offset += 2; + length -= 2; + } + + if (length > 0) { + int value = input.readIntUnsafe(); + values[offset] = Integer.reverseBytes(value); + } + } + } + + private static final class BigEndianReader3 + implements ShortDecimalDecoder + { + @Override + public void decode(SimpleSliceInputStream input, long[] values, int offset, int length) + { + int bytesOffSet = 0; + int endOffset = offset + length; + int i = offset; + for (; i < endOffset - 2; i += 2) { + // We read redundant bytes and then ignore them. Sign bit is propagated by `>>` operator + long value = Long.reverseBytes(input.getLongUnsafe(bytesOffSet)); + values[i] = value >> 40; + values[i + 1] = value << 24 >> 40; + bytesOffSet += 6; + } + // Decode the last values "normally" as it would read data out of bounds + while (i < endOffset) { + values[i++] = decode(input, bytesOffSet); + bytesOffSet += 3; + } + input.skip(bytesOffSet); + } + + private long decode(SimpleSliceInputStream input, int index) + { + long value = (input.getByteUnsafe(index + 2) & 0xFFL) + | (input.getByteUnsafe(index + 1) & 0xFFL) << 8 + | (input.getByteUnsafe(index) & 0xFFL) << 16; + return propagateSignBit(value, 40); + } + } + + private static final class BigEndianReader2 + implements ShortDecimalDecoder + { + @Override + public void decode(SimpleSliceInputStream input, long[] values, int offset, int length) + { + while (length > 3) { + long value = input.readLongUnsafe(); + // Reverse all bytes at once + value = Long.reverseBytes(value); + + // We first shift the byte as left as possible. Then, when shifting back right, + // the sign bit will get propagated + values[offset] = value >> 48; + values[offset + 1] = (value << 16) >> 48; + values[offset + 2] = (value << 32) >> 48; + values[offset + 3] = (value << 48) >> 48; + + offset += 4; + length -= 4; + } + + while (length > 0) { + // Implicit cast will propagate the sign bit correctly, as it is performed after the byte reversal. + values[offset++] = Short.reverseBytes(input.readShort()); + length--; + } + } + } + + private static final class BigEndianReader1 + implements ShortDecimalDecoder + { + @Override + public void decode(SimpleSliceInputStream input, long[] values, int offset, int length) + { + while (length > 7) { + long value = input.readLongUnsafe(); + + // We first shift the byte as left as possible. Then, when shifting back right, + // the sign bit will get propagated + values[offset] = (value << 56) >> 56; + values[offset + 1] = (value << 48) >> 56; + values[offset + 2] = (value << 40) >> 56; + values[offset + 3] = (value << 32) >> 56; + values[offset + 4] = (value << 24) >> 56; + values[offset + 5] = (value << 16) >> 56; + values[offset + 6] = (value << 8) >> 56; + values[offset + 7] = value >> 56; + + offset += 8; + length -= 8; + } + + while (length > 0) { + // Implicit cast will propagate the sign bit correctly + values[offset++] = input.readByte(); + length--; + } + } + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/LongDecimalRLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/LongDecimalRLEDictionaryValuesDecoder.java new file mode 100644 index 0000000000000..b130aa416719d --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/LongDecimalRLEDictionaryValuesDecoder.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.rle; + +import com.facebook.presto.common.type.Decimals; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.BinaryValuesDecoder.ValueBuffer; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.LongDecimalValuesDecoder; +import com.facebook.presto.parquet.batchreader.dictionary.BinaryBatchDictionary; +import io.airlift.slice.Slice; + +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; + +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static java.util.Objects.requireNonNull; + +public class LongDecimalRLEDictionaryValuesDecoder + extends BaseRLEBitPackedDecoder + implements LongDecimalValuesDecoder +{ + private final BinaryRLEDictionaryValuesDecoder delegate; + + public LongDecimalRLEDictionaryValuesDecoder(int bitWidth, InputStream inputStream, BinaryBatchDictionary dictionary) + { + super(Integer.MAX_VALUE, bitWidth, inputStream); + requireNonNull(dictionary, "dictionary is null"); + delegate = new BinaryRLEDictionaryValuesDecoder(bitWidth, inputStream, dictionary); + } + + @Override + public void readNext(long[] values, int offset, int length) + throws IOException + { + ValueBuffer valueBuffer = delegate.readNext(length); + int bufferSize = valueBuffer.getBufferSize(); + byte[] byteBuffer = new byte[bufferSize]; + int[] offsets = new int[bufferSize + 1]; + delegate.readIntoBuffer(byteBuffer, 0, offsets, 0, valueBuffer); + + for (int i = 0; i < length; i++) { + int positionOffset = offsets[i]; + int positionLength = offsets[i + 1] - positionOffset; + byte[] temp = new byte[positionLength]; + System.arraycopy(byteBuffer, positionOffset, temp, 0, positionLength); + Slice slice = Decimals.encodeUnscaledValue(new BigInteger(temp)); + values[2 * (offset + i)] = slice.getLong(0); + values[2 * (offset + i) + 1] = slice.getLong(SIZE_OF_LONG); + } + } + + @Override + public void skip(int length) + throws IOException + { + delegate.skip(length); + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/ShortDecimalRLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/ShortDecimalRLEDictionaryValuesDecoder.java new file mode 100644 index 0000000000000..23e0ff931f5ac --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/ShortDecimalRLEDictionaryValuesDecoder.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader.decoders.rle; + +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.BinaryValuesDecoder.ValueBuffer; +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder; +import com.facebook.presto.parquet.batchreader.dictionary.BinaryBatchDictionary; +import org.apache.parquet.io.ParquetDecodingException; + +import java.io.IOException; +import java.io.InputStream; + +import static com.facebook.presto.parquet.ParquetTypeUtils.getShortDecimalValue; +import static java.util.Objects.requireNonNull; + +public class ShortDecimalRLEDictionaryValuesDecoder + extends BaseRLEBitPackedDecoder + implements ShortDecimalValuesDecoder +{ + private final BinaryRLEDictionaryValuesDecoder delegate; + + public ShortDecimalRLEDictionaryValuesDecoder(int bitWidth, InputStream inputStream, BinaryBatchDictionary dictionary) + { + super(Integer.MAX_VALUE, bitWidth, inputStream); + requireNonNull(dictionary, "dictionary is null"); + delegate = new BinaryRLEDictionaryValuesDecoder(bitWidth, inputStream, dictionary); + } + + @Override + public void readNext(long[] values, int offset, int length) + throws IOException + { + ValueBuffer valueBuffer = delegate.readNext(length); + int bufferSize = valueBuffer.getBufferSize(); + byte[] byteBuffer = new byte[bufferSize]; + int[] offsets = new int[bufferSize + 1]; + delegate.readIntoBuffer(byteBuffer, 0, offsets, 0, valueBuffer); + + for (int i = 0; i < length; i++) { + int positionOffset = offsets[i]; + int positionLength = offsets[i + 1] - positionOffset; + if (positionLength > 8) { + throw new ParquetDecodingException("Unable to read BINARY type decimal of size " + positionLength + " as a short decimal"); + } + + values[offset + i] = getShortDecimalValue(byteBuffer, positionOffset, positionLength); + } + } + + @Override + public void skip(int length) + throws IOException + { + delegate.skip(length); + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/dictionary/BinaryBatchDictionary.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/dictionary/BinaryBatchDictionary.java index 234de5a879d9e..9ddf8cdb7a506 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/dictionary/BinaryBatchDictionary.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/dictionary/BinaryBatchDictionary.java @@ -33,8 +33,14 @@ public final class BinaryBatchDictionary private final byte[] pageBuffer; private final int dictionarySize; private final int[] offsets; + private final Integer length; public BinaryBatchDictionary(DictionaryPage dictionaryPage) + { + this(dictionaryPage, null); + } + + public BinaryBatchDictionary(DictionaryPage dictionaryPage, Integer length) { super(dictionaryPage.getEncoding()); requireNonNull(dictionaryPage, "dictionaryPage is null"); @@ -42,16 +48,29 @@ public BinaryBatchDictionary(DictionaryPage dictionaryPage) this.dictionarySize = dictionaryPage.getDictionarySize(); this.pageBuffer = requireNonNull(dictionaryPage.getSlice(), "dictionary slice is null").getBytes(); + this.length = length; // initialize the offsets array IntList offsetList = new IntArrayList(); int offset = 0; - while (offset < pageBuffer.length) { - int length = BytesUtils.getInt(pageBuffer, offset); + if (length == null) { + while (offset < pageBuffer.length) { + int len = BytesUtils.getInt(pageBuffer, offset); + offsetList.add(offset); + offset += (4 + len); + } + offsetList.add(offset); + } + else { + int index = 0; + while (index < dictionarySize) { + offsetList.add(offset); + offset += length; + index++; + } offsetList.add(offset); - offset += (4 + length); } - offsetList.add(offset); + this.offsets = offsetList.toIntArray(); checkArgument(offsets.length - 1 == dictionarySize, "Dictionary size and number of entries don't match"); @@ -60,14 +79,25 @@ public BinaryBatchDictionary(DictionaryPage dictionaryPage) public int getLength(int dictionaryId) { checkArgument(dictionaryId >= 0 && dictionaryId < dictionarySize, "invalid dictionary id: %s", dictionaryId); - return offsets[dictionaryId + 1] - (offsets[dictionaryId] + 4); + if (length == null) { + return offsets[dictionaryId + 1] - (offsets[dictionaryId] + 4); + } + else { + return length; + } } public int copyTo(byte[] byteBuffer, int offset, int dictionaryId) { - int length = offsets[dictionaryId + 1] - (offsets[dictionaryId] + 4); - System.arraycopy(pageBuffer, offsets[dictionaryId] + 4, byteBuffer, offset, length); - return length; + if (length == null) { + int len = offsets[dictionaryId + 1] - (offsets[dictionaryId] + 4); + System.arraycopy(pageBuffer, offsets[dictionaryId] + 4, byteBuffer, offset, len); + return len; + } + else { + System.arraycopy(pageBuffer, offsets[dictionaryId], byteBuffer, offset, length); + return length; + } } @Override diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/dictionary/Dictionaries.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/dictionary/Dictionaries.java index f3a22eb9603a2..46ccdaca4b0f3 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/dictionary/Dictionaries.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/dictionary/Dictionaries.java @@ -44,6 +44,7 @@ public static Dictionary createDictionary(ColumnDescriptor columnDescriptor, Dic case BINARY: return new BinaryBatchDictionary(dictionaryPage); case FIXED_LEN_BYTE_ARRAY: + return new BinaryBatchDictionary(dictionaryPage, columnDescriptor.getTypeLength()); case BOOLEAN: default: break; diff --git a/presto-parquet/src/main/resources/freemarker/data/ParquetTypes.tdd b/presto-parquet/src/main/resources/freemarker/data/ParquetTypes.tdd index 788901fb7de94..6d7156dfe863a 100644 --- a/presto-parquet/src/main/resources/freemarker/data/ParquetTypes.tdd +++ b/presto-parquet/src/main/resources/freemarker/data/ParquetTypes.tdd @@ -23,6 +23,18 @@ blockType: "LongArrayBlock", valuesDecoder: "Int64TimestampMicrosValuesDecoder", primitiveType: "long" - } + }, + { + classNamePrefix: "ShortDecimal", + blockType: "LongArrayBlock", + valuesDecoder: "ShortDecimalValuesDecoder", + primitiveType: "long" + }, + { + classNamePrefix: "LongDecimal", + blockType: "Int128ArrayBlock", + valuesDecoder: "LongDecimalValuesDecoder", + primitiveType: "long" + }, ] } diff --git a/presto-parquet/src/main/resources/freemarker/templates/ParquetFlatColumnReaderGenerator.tdd b/presto-parquet/src/main/resources/freemarker/templates/ParquetFlatColumnReaderGenerator.tdd index d342473d6a4c5..0b11669d8026b 100644 --- a/presto-parquet/src/main/resources/freemarker/templates/ParquetFlatColumnReaderGenerator.tdd +++ b/presto-parquet/src/main/resources/freemarker/templates/ParquetFlatColumnReaderGenerator.tdd @@ -4,6 +4,11 @@ <#assign updatedTemplate = updatedTemplate?replace("com.facebook.presto.common.block.IntArrayBlock", "com.facebook.presto.common.block.${type.blockType}")> <#assign updatedTemplate = updatedTemplate?replace("IntArrayBlock", "${type.blockType}")> <#assign updatedTemplate = updatedTemplate?replace("Int32ValuesDecoder", "${type.valuesDecoder}")> +<#if !type.classNamePrefix?starts_with("LongDecimal")> <#assign updatedTemplate = updatedTemplate?replace("int[] values = new int[nextBatchSize]", "${type.primitiveType}[] values = new ${type.primitiveType}[nextBatchSize]")> +<#else> +<#assign updatedTemplate = updatedTemplate?replace("int[] values = new int[nextBatchSize]", "${type.primitiveType}[] values = new ${type.primitiveType}[nextBatchSize * 2]")> +<#assign updatedTemplate = updatedTemplate?replace("values[valueDestinationIndex] = values[valueSourceIndex]", "values[valueDestinationIndex * 2 + 1] = values[valueSourceIndex * 2 + 1];\n values[valueDestinationIndex * 2] = values[valueSourceIndex * 2]")> + ${updatedTemplate} \ No newline at end of file diff --git a/presto-parquet/src/main/resources/freemarker/templates/ParquetNestedColumnReaderGenerator.tdd b/presto-parquet/src/main/resources/freemarker/templates/ParquetNestedColumnReaderGenerator.tdd index f8f66da761325..f19a2140d7d45 100644 --- a/presto-parquet/src/main/resources/freemarker/templates/ParquetNestedColumnReaderGenerator.tdd +++ b/presto-parquet/src/main/resources/freemarker/templates/ParquetNestedColumnReaderGenerator.tdd @@ -1,4 +1,5 @@ <#list parquetTypes.flatTypes as type> +<#if !type.classNamePrefix?ends_with("Decimal")> <@pp.changeOutputFile name="/com/facebook/presto/parquet/batchreader/${type.classNamePrefix}NestedBatchReader.java" /> <#assign updatedTemplate = nestedTypeTemplate?replace("Int32NestedBatchReader", "${type.classNamePrefix}NestedBatchReader")> <#assign updatedTemplate = updatedTemplate?replace("com.facebook.presto.common.block.IntArrayBlock", "com.facebook.presto.common.block.${type.blockType}")> @@ -6,4 +7,5 @@ <#assign updatedTemplate = updatedTemplate?replace("Int32ValuesDecoder", "${type.valuesDecoder}")> <#assign updatedTemplate = updatedTemplate?replace("int[] values = new int[newBatchSize]", "${type.primitiveType}[] values = new ${type.primitiveType}[newBatchSize]")> ${updatedTemplate} + \ No newline at end of file diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/BenchmarkParquetReader.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/BenchmarkParquetReader.java index 5ca2864671d78..c1b56956e16cf 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/BenchmarkParquetReader.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/BenchmarkParquetReader.java @@ -15,7 +15,9 @@ import com.facebook.presto.common.block.Block; import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.common.type.DecimalType; import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.SqlDecimal; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.VarcharType; import com.facebook.presto.parquet.cache.MetadataReader; @@ -77,7 +79,7 @@ public class BenchmarkParquetReader { public static final int ROWS = 10_000_000; - private static final boolean enableOptimizedReader = true; + private static final boolean enableBatchReader = true; private static final boolean enableVerification = false; public static void main(String[] args) @@ -160,6 +162,34 @@ public Object readInt96WithNull(Int96WithNullBenchmarkData data) return read(data); } + @Benchmark + public Object readShortDecimalNoNull(ShortDecimalNoNullBenchmarkData data) + throws Throwable + { + return read(data); + } + + @Benchmark + public Object readShortDecimalWithNull(ShortDecimalWithNullBenchmarkData data) + throws Throwable + { + return read(data); + } + + @Benchmark + public Object readLongDecimalNoNull(LongDecimalNoNullBenchmarkData data) + throws Throwable + { + return read(data); + } + + @Benchmark + public Object readLongDecimalWithNull(LongDecimalWithNullBenchmarkData data) + throws Throwable + { + return read(data); + } + @Benchmark public Object readSliceDictionaryNoNull(VarcharNoNullBenchmarkData data) throws Throwable @@ -279,7 +309,7 @@ ParquetReader createRecordReader() this.field = ColumnIOConverter.constructField(getType(), messageColumnIO.getChild(0)).get(); - return new ParquetReader(messageColumnIO, parquetMetadata.getBlocks(), Optional.empty(), dataSource, newSimpleAggregatedMemoryContext(), new DataSize(16, MEGABYTE), enableOptimizedReader, enableVerification, null, null, false, Optional.empty()); + return new ParquetReader(messageColumnIO, parquetMetadata.getBlocks(), Optional.empty(), dataSource, newSimpleAggregatedMemoryContext(), new DataSize(16, MEGABYTE), enableBatchReader, enableVerification, null, null, false, Optional.empty()); } protected boolean getNullability() @@ -504,6 +534,112 @@ protected Type getType() } } + @State(Scope.Thread) + public static class ShortDecimalNoNullBenchmarkData + extends BenchmarkData + { + @Override + protected Type getType() + { + return DecimalType.createDecimalType(11, 10); + } + + @Override + protected List generateValues() + { + List values = new ArrayList<>(); + for (int i = 0; i < ROWS; ++i) { + values.add(SqlDecimal.of(random.nextInt(), 11, 10)); + } + return values; + } + + @Override + protected boolean getNullability() + { + return false; + } + } + + @State(Scope.Thread) + public static class ShortDecimalWithNullBenchmarkData + extends BenchmarkData + { + @Override + protected List generateValues() + { + List values = new ArrayList<>(); + for (int i = 0; i < ROWS; ++i) { + if (random.nextBoolean()) { + values.add(SqlDecimal.of(random.nextInt(), 18, 18)); + } + else { + values.add(null); + } + } + return values; + } + + @Override + protected Type getType() + { + return DecimalType.createDecimalType(18, 18); + } + } + + @State(Scope.Thread) + public static class LongDecimalNoNullBenchmarkData + extends BenchmarkData + { + @Override + protected Type getType() + { + return DecimalType.createDecimalType(38, 18); + } + + @Override + protected List generateValues() + { + List values = new ArrayList<>(); + for (int i = 0; i < ROWS; ++i) { + values.add(SqlDecimal.of(random.nextInt(), 38, 18)); + } + return values; + } + + @Override + protected boolean getNullability() + { + return false; + } + } + + @State(Scope.Thread) + public static class LongDecimalWithNullBenchmarkData + extends BenchmarkData + { + @Override + protected List generateValues() + { + List values = new ArrayList<>(); + for (int i = 0; i < ROWS; ++i) { + if (random.nextBoolean()) { + values.add(SqlDecimal.of(random.nextInt(), 38, 18)); + } + else { + values.add(null); + } + } + return values; + } + + @Override + protected Type getType() + { + return DecimalType.createDecimalType(18, 18); + } + } + @State(Scope.Thread) public static class VarcharNoNullBenchmarkData extends BenchmarkData @@ -907,6 +1043,22 @@ protected Type getType() dataInt96WithNull.setup(); benchmark.readInt96WithNull(dataInt96WithNull); + ShortDecimalNoNullBenchmarkData dataShortDecimalNoNull = new ShortDecimalNoNullBenchmarkData(); + dataShortDecimalNoNull.setup(); + benchmark.readShortDecimalNoNull(dataShortDecimalNoNull); + + ShortDecimalWithNullBenchmarkData dataShortDecimalWithNull = new ShortDecimalWithNullBenchmarkData(); + dataShortDecimalWithNull.setup(); + benchmark.readShortDecimalWithNull(dataShortDecimalWithNull); + + LongDecimalNoNullBenchmarkData dataLongDecimalNoNull = new LongDecimalNoNullBenchmarkData(); + dataLongDecimalNoNull.setup(); + benchmark.readLongDecimalNoNull(dataLongDecimalNoNull); + + LongDecimalWithNullBenchmarkData dataLongDecimalWithNull = new LongDecimalWithNullBenchmarkData(); + dataLongDecimalWithNull.setup(); + benchmark.readLongDecimalWithNull(dataLongDecimalWithNull); + VarcharNoNullBenchmarkData dataVarcharNoNull = new VarcharNoNullBenchmarkData(); dataVarcharNoNull.setup(); benchmark.readSliceDictionaryNoNull(dataVarcharNoNull); diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/ParquetTestUtils.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/ParquetTestUtils.java index 484d443bfce69..e143ff35c691d 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/ParquetTestUtils.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/ParquetTestUtils.java @@ -134,6 +134,7 @@ static void writeParquetColumnHive(File file, String columnName, boolean nullabl jobConf.setLong(ParquetOutputFormat.BLOCK_SIZE, new DataSize(256, MEGABYTE).toBytes()); jobConf.setLong(ParquetOutputFormat.PAGE_SIZE, new DataSize(100, KILOBYTE).toBytes()); jobConf.set(ParquetOutputFormat.COMPRESSION, "snappy"); + jobConf.setBoolean(org.apache.parquet.hadoop.ParquetOutputFormat.ENABLE_DICTIONARY, false); Properties properties = new Properties(); properties.setProperty("columns", columnName); @@ -439,7 +440,8 @@ else if (type instanceof PrimitiveType) { if (primitiveType.getDecimalMetadata() != null) { builder = (Types.PrimitiveBuilder) builder.scale(primitiveType.getDecimalMetadata().getScale()) - .precision(primitiveType.getDecimalMetadata().getPrecision()); + .precision(primitiveType.getDecimalMetadata().getPrecision()) + .as(primitiveType.getOriginalType()); } return builder.length(primitiveType.getTypeLength()) diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/TestSimpleSliceInputStream.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/TestSimpleSliceInputStream.java new file mode 100644 index 0000000000000..5765032bb4877 --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/TestSimpleSliceInputStream.java @@ -0,0 +1,160 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.batchreader; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class TestSimpleSliceInputStream +{ + @DataProvider(name = "dataTypes") + public Object[][] dataTypes() + { + return new Object[][] { + {Byte.TYPE, Byte.BYTES}, + {Short.TYPE, Short.BYTES}, + {Integer.TYPE, Integer.BYTES}, + {Long.TYPE, Long.BYTES}, + }; + } + + @Test(dataProvider = "dataTypes") + public void testReadSimpleType(Class clazz, int sizeOfType) + { + int numElements = 100; + Slice slice = Slices.allocate(sizeOfType * numElements); + for (int i = 0; i < numElements; i++) { + switch (clazz.getName()) { + case "byte": + slice.setByte(i, i); + break; + case "short": + slice.setShort(i * sizeOfType, i); + break; + case "int": + slice.setInt(i * sizeOfType, i); + break; + case "long": + slice.setLong(i * sizeOfType, i); + break; + default: + } + } + + SimpleSliceInputStream simpleSliceInputStream = new SimpleSliceInputStream(slice); + for (int i = 0; i < numElements; i++) { + switch (clazz.getName()) { + case "byte": + byte actualByte = simpleSliceInputStream.readByte(); + assertEquals(actualByte, i); + break; + case "short": + short actualShort = simpleSliceInputStream.readShort(); + assertEquals(actualShort, i); + break; + case "int": + int actualInt = simpleSliceInputStream.readInt(); + assertEquals(actualInt, i); + break; + case "long": + long actualLong = simpleSliceInputStream.readLong(); + assertEquals(actualLong, i); + break; + default: + } + } + } + + @Test + public void testReadBytes() + { + int numElements = 100; + Slice slice = Slices.allocate(2 * numElements); + byte[] expected = new byte[2 * numElements]; + + int offset = 0; + for (int i = 0; i < numElements; i++) { + String str = "" + i; + slice.setBytes(offset, str.getBytes()); + int length = str.getBytes().length; + System.arraycopy(str.getBytes(), 0, expected, offset, length); + offset += length; + } + + SimpleSliceInputStream simpleSliceInputStream = new SimpleSliceInputStream(slice); + byte[] actual = simpleSliceInputStream.readBytes(); + assertEquals(actual, expected); + } + + @Test + public void testReadMixData() + { + Slice slice = Slices.allocate(Byte.BYTES + Short.BYTES + Integer.BYTES + Long.BYTES); + + int offset = 0; + + // Write byte + slice.setByte(offset, Byte.MAX_VALUE); + offset += Byte.BYTES; + + // Write short + slice.setShort(offset, Short.MAX_VALUE); + offset += Short.BYTES; + + // Write int + slice.setInt(offset, Integer.MAX_VALUE); + offset += Integer.BYTES; + + // Write int + slice.setLong(offset, Long.MAX_VALUE); + + SimpleSliceInputStream simpleSliceInputStream = new SimpleSliceInputStream(slice); + byte actualByte = simpleSliceInputStream.readByte(); + assertEquals(actualByte, Byte.MAX_VALUE); + + short actualShort = simpleSliceInputStream.readShort(); + assertEquals(actualShort, Short.MAX_VALUE); + + int actualInt = simpleSliceInputStream.readInt(); + assertEquals(actualInt, Integer.MAX_VALUE); + + long actualLong = simpleSliceInputStream.readLong(); + assertEquals(actualLong, Long.MAX_VALUE); + } + + @Test + public void testGetByteArray() + { + int numElements = 100; + Slice slice = Slices.allocate(2 * numElements); + byte[] expected = new byte[2 * numElements]; + + int offset = 0; + for (int i = 0; i < numElements; i++) { + String str = "" + i; + slice.setBytes(offset, str.getBytes()); + int length = str.getBytes().length; + System.arraycopy(str.getBytes(), 0, expected, offset, length); + offset += length; + } + + SimpleSliceInputStream simpleSliceInputStream = new SimpleSliceInputStream(slice); + byte[] actual = simpleSliceInputStream.getByteArray(); + assertEquals(actual, expected); + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/AbstractColumnReaderBenchmark.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/AbstractColumnReaderBenchmark.java new file mode 100644 index 0000000000000..a3768d560ca37 --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/AbstractColumnReaderBenchmark.java @@ -0,0 +1,187 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import com.facebook.presto.parquet.ColumnReader; +import com.facebook.presto.parquet.ColumnReaderFactory; +import com.facebook.presto.parquet.DataPage; +import com.facebook.presto.parquet.DataPageV1; +import com.facebook.presto.parquet.ParquetEncoding; +import com.facebook.presto.parquet.PrimitiveField; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.apache.parquet.column.values.ValuesWriter; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.results.format.ResultFormatType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; +import org.openjdk.jmh.runner.options.WarmupMode; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; + +import static com.facebook.presto.parquet.ParquetEncoding.RLE; +import static com.facebook.presto.parquet.ParquetResultVerifierUtils.verifyColumnChunks; +import static com.facebook.presto.parquet.ParquetTypeUtils.getParquetEncoding; +import static java.lang.String.format; +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; + +@State(Scope.Thread) +@OutputTimeUnit(SECONDS) +@Measurement(iterations = 10, time = 500, timeUnit = MILLISECONDS) +@Warmup(iterations = 5, time = 500, timeUnit = MILLISECONDS) +@Fork(2) +public abstract class AbstractColumnReaderBenchmark +{ + // Parquet pages are usually about 1MB + private static final int MIN_PAGE_SIZE = 1_000_000; + private static final int OUTPUT_BUFFER_SIZE = MIN_PAGE_SIZE * 2; // Needs to be more than MIN_PAGE_SIZE + private static final int MAX_VALUES = 1_000_000; + + private static final int DATA_GENERATION_BATCH_SIZE = 16384; + private static final int READ_BATCH_SIZE = 4096; + + private static final boolean ENABLE_VERIFICATION = true; + + private final List dataPages = new ArrayList<>(); + private int dataPositions; + + protected PrimitiveField field; + // + @Param({ + "PLAIN", "DELTA_BYTE_ARRAY" + }) + public ParquetEncoding parquetEncoding; + + @Param({ + "true", "false", + }) + public boolean enableOptimizedReader; + + protected abstract PrimitiveField createPrimitiveField(); + + protected abstract ValuesWriter createValuesWriter(int bufferSize); + + protected abstract T generateDataBatch(int size); + + protected abstract void writeValue(ValuesWriter writer, T batch, int index); + + protected abstract boolean getEnableOptimizedReader(); + + @Setup + public void setup() + throws IOException + { + this.field = createPrimitiveField(); + + ValuesWriter writer = createValuesWriter(OUTPUT_BUFFER_SIZE); + int batchIndex = 0; + T batch = generateDataBatch(DATA_GENERATION_BATCH_SIZE); + + while (writer.getBufferedSize() < MIN_PAGE_SIZE && dataPositions < MAX_VALUES) { + if (batchIndex == DATA_GENERATION_BATCH_SIZE) { + dataPages.add(createDataPage(writer, batchIndex)); + batch = generateDataBatch(DATA_GENERATION_BATCH_SIZE); + batchIndex = 0; + } + writeValue(writer, batch, batchIndex++); + dataPositions++; + } + + if (batchIndex > 0) { + dataPages.add(createDataPage(writer, batchIndex)); + } + } + + @Benchmark + public int read() + throws IOException + { + ColumnReader columnReader = ColumnReaderFactory.createReader(field.getDescriptor(), getEnableOptimizedReader()); + columnReader.init(new PageReader(UNCOMPRESSED, new LinkedList<>(dataPages).listIterator(), MAX_VALUES, null, null, Optional.empty(), null, -1, -1), field, null); + + ColumnReader reader = null; + if (ENABLE_VERIFICATION) { + reader = ColumnReaderFactory.createReader(field.getDescriptor(), false); + reader.init(new PageReader(UNCOMPRESSED, new LinkedList<>(dataPages).listIterator(), MAX_VALUES, null, null, Optional.empty(), null, -1, -1), field, null); + } + + int rowsRead = 0; + while (rowsRead < dataPositions) { + int remaining = dataPositions - rowsRead; + columnReader.prepareNextRead(Math.min(READ_BATCH_SIZE, remaining)); + ColumnChunk columnChunk = columnReader.readNext(); + rowsRead += columnChunk.getBlock().getPositionCount(); + if (ENABLE_VERIFICATION) { + reader.prepareNextRead(Math.min(READ_BATCH_SIZE, remaining)); + ColumnChunk expected = reader.readNext(); + verifyColumnChunks(columnChunk, expected, false, field, null); + } + } + return rowsRead; + } + + private DataPage createDataPage(ValuesWriter writer, int valuesCount) + { + Slice data; + try { + data = Slices.wrappedBuffer(writer.getBytes().toByteArray()); + writer.reset(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + return new DataPageV1( + data, + valuesCount, + data.length(), + -1, + null, + RLE, + RLE, + getParquetEncoding(writer.getEncoding())); + } + + protected static void run(Class clazz) + throws RunnerException + { + ChainedOptionsBuilder optionsBuilder = new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .warmupMode(WarmupMode.BULK) + .resultFormat(ResultFormatType.JSON) + .result(format("%s/%s-result-%s.json", System.getProperty("java.io.tmpdir"), clazz.getSimpleName(), ISO_DATE_TIME.format(LocalDateTime.now()))) + .jvmArgsAppend("-Xmx4g", "-Xms4g") + .include("^\\Q" + clazz.getName() + ".\\E"); + + new Runner(optionsBuilder.build()).run(); + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkDecimalColumnBatchReader.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkDecimalColumnBatchReader.java new file mode 100644 index 0000000000000..f0edf743e03fb --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkDecimalColumnBatchReader.java @@ -0,0 +1,519 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.type.DecimalType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.parquet.BenchmarkParquetReader; +import com.facebook.presto.parquet.Field; +import com.facebook.presto.parquet.FileParquetDataSource; +import com.facebook.presto.parquet.cache.MetadataReader; +import io.airlift.units.DataSize; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.ColumnIOConverter; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type.Repetition; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.results.format.ResultFormatType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.io.File; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; +import static com.facebook.presto.parquet.BenchmarkParquetReader.ROWS; +import static com.facebook.presto.parquet.ParquetTypeUtils.getColumnIO; +import static com.facebook.presto.parquet.reader.TestData.longToBytes; +import static com.facebook.presto.parquet.reader.TestData.maxPrecision; +import static com.facebook.presto.parquet.reader.TestData.unscaledRandomShortDecimalSupplier; +import static com.google.common.io.Files.createTempDir; +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static java.lang.String.format; +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.UUID.randomUUID; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0; +import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; +import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE; +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; + +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(3) +@Warmup(iterations = 30, time = 500, timeUnit = MILLISECONDS) +@Measurement(iterations = 20, time = 500, timeUnit = MILLISECONDS) +@BenchmarkMode(Mode.AverageTime) +@OperationsPerInvocation(BenchmarkParquetReader.ROWS) +public class BenchmarkDecimalColumnBatchReader +{ + public static final int DICT_PAGE_SIZE = 512; + public static final String FIELD_NAME = "decimal_test_column"; + + @Param({ + "true", "false", + }) + public boolean enableOptimizedReader; + + @Param({ + "true", "false", + }) + public static boolean nullable = true; + + @Param({ + "PARQUET_1_0", "PARQUET_2_0", + }) + // PARQUET_1_0 => PLAIN + // PARQUET_2_0 => DELTA_BYTE_ARRAY, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY + public static WriterVersion writerVersion = PARQUET_2_0; + + public static void main(String[] args) + throws Throwable + { + Options options = new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + BenchmarkDecimalColumnBatchReader.class.getSimpleName() + ".*") + .resultFormat(ResultFormatType.JSON) + .result(format("%s/%s-result-%s.json", System.getProperty("java.io.tmpdir"), BenchmarkDecimalColumnBatchReader.class.getSimpleName(), ISO_DATE_TIME.format(LocalDateTime.now()))) + .shouldFailOnError(true) + .build(); + + new Runner(options).run(); + } + + @Benchmark + public Object readShortDecimalByteArrayLength(ShortDecimalByteArrayLengthBenchmarkData data) + throws Throwable + { + return read(data, enableOptimizedReader); + } + + @Benchmark + public Object readShortDecimal(ShortDecimalBenchmarkData data) + throws Throwable + { + return read(data, enableOptimizedReader); + } + + @Benchmark + public Object readLongDecimal(LongDecimalBenchmarkData data) + throws Throwable + { + return read(data, enableOptimizedReader); + } + + public static Object read(BenchmarkData data, boolean enableOptimizedReader) + throws Exception + { + try (ParquetReader recordReader = data.createRecordReader(enableOptimizedReader)) { + List blocks = new ArrayList<>(); + while (recordReader.nextBatch() > 0) { + Block block = recordReader.readBlock(data.field); + blocks.add(block); + } + return blocks; + } + } + + @State(Scope.Thread) + public static class ShortDecimalByteArrayLengthBenchmarkData + extends BenchmarkData + { + @Param({ + "1", "2", "3", "4", "5", "6", "7", "8", + }) + public int byteArrayLength = 1; + + @Override + protected Type getType() + { + return DecimalType.createDecimalType(getPrecision(), getScale()); + } + + @Override + protected String getPrimitiveTypeName() + { + return "FIXED_LEN_BYTE_ARRAY(" + byteArrayLength + ")"; + } + + @Override + protected int getPrecision() + { + return maxPrecision(byteArrayLength); + } + + @Override + protected int getScale() + { + return 1; + } + + @Override + protected MessageType getSchema() + { + DecimalType decimalType = (DecimalType) getType(); + String type = format("DECIMAL(%d,%d)", decimalType.getPrecision(), decimalType.getScale()); + return parseMessageType( + "message test { " + + Repetition.REQUIRED + " " + getPrimitiveTypeName() + " " + FIELD_NAME + " (" + type + "); " + + "} "); + } + + @Override + protected List generateValues() + { + List values = new ArrayList<>(); + int precision = ((DecimalType) getType()).getPrecision(); + long[] dataGen = unscaledRandomShortDecimalSupplier(byteArrayLength * Byte.SIZE, precision).apply(ROWS); + + for (int i = 0; i < ROWS; ++i) { + values.add(Binary.fromConstantByteArray(longToBytes(dataGen[i], byteArrayLength))); + } + return values; + } + } + + @State(Scope.Thread) + public static class ShortDecimalBenchmarkData + extends BenchmarkData + { + @Param({ + "INT32", "INT64", "BINARY", "FIXED_LEN_BYTE_ARRAY(8)", + }) + public static String decimalPrimitiveTypeName = "FIXED_LEN_BYTE_ARRAY(8)"; + + @Override + protected Type getType() + { + return DecimalType.createDecimalType(getPrecision(), getScale()); + } + + @Override + protected String getPrimitiveTypeName() + { + return decimalPrimitiveTypeName; + } + + @Override + protected int getPrecision() + { + switch (getPrimitiveTypeName()) { + case "INT32": + return 9; + default: + return 18; + } + } + + @Override + protected int getScale() + { + switch (getPrimitiveTypeName()) { + case "INT32": + case "INT64": + return 0; + default: + return 12; + } + } + + @Override + protected MessageType getSchema() + { + boolean nullability = getNullability(); + Repetition repetition = nullability ? Repetition.OPTIONAL : Repetition.REQUIRED; + + DecimalType decimalType = (DecimalType) getType(); + String type = format("DECIMAL(%d,%d)", decimalType.getPrecision(), decimalType.getScale()); + return parseMessageType( + "message test { " + + repetition + " " + getPrimitiveTypeName() + " " + FIELD_NAME + " (" + type + "); " + + "} "); + } + + @Override + protected List generateValues() + { + List values = new ArrayList<>(); + for (int i = 0; i < ROWS; ++i) { + if (getNullability()) { + if (random.nextBoolean()) { + switch (getPrimitiveTypeName()) { + case "INT32": + values.add(random.nextInt()); + break; + case "INT64": + values.add(random.nextLong()); + break; + default: + values.add(Binary.fromConstantByteArray(longToBytes(random.nextLong(), 8))); + break; + } + } + else { + values.add(null); + } + } + else { + switch (getPrimitiveTypeName()) { + case "INT32": + values.add(random.nextInt()); + break; + case "INT64": + values.add(random.nextLong()); + break; + default: + values.add(Binary.fromConstantByteArray(longToBytes(random.nextLong(), 8))); + break; + } + } + } + return values; + } + + protected boolean getNullability() + { + return nullable; + } + } + + @State(Scope.Thread) + public static class LongDecimalBenchmarkData + extends BenchmarkData + { + @Param({ + "BINARY", "FIXED_LEN_BYTE_ARRAY(16)", + }) + public static String decimalPrimitiveTypeName = "FIXED_LEN_BYTE_ARRAY(16)"; + + @Override + protected Type getType() + { + return DecimalType.createDecimalType(getPrecision(), getScale()); + } + + @Override + protected String getPrimitiveTypeName() + { + return decimalPrimitiveTypeName; + } + + @Override + protected int getPrecision() + { + return 38; + } + + @Override + protected int getScale() + { + return 2; + } + + @Override + protected MessageType getSchema() + { + boolean nullability = getNullability(); + Repetition repetition = nullability ? Repetition.OPTIONAL : Repetition.REQUIRED; + + DecimalType decimalType = (DecimalType) getType(); + String type = format("DECIMAL(%d,%d)", decimalType.getPrecision(), decimalType.getScale()); + return parseMessageType( + "message test { " + + repetition + " " + getPrimitiveTypeName() + " " + FIELD_NAME + " (" + type + "); " + + "} "); + } + + @Override + protected List generateValues() + { + List values = new ArrayList<>(); + for (int i = 0; i < ROWS; ++i) { + if (getNullability()) { + if (random.nextBoolean()) { + values.add(Binary.fromConstantByteArray(longToBytes(random.nextLong(), 16))); + } + else { + values.add(null); + } + } + else { + values.add(Binary.fromConstantByteArray(longToBytes(random.nextLong(), 16))); + } + } + return values; + } + + protected boolean getNullability() + { + return nullable; + } + } + + public abstract static class BenchmarkData + { + protected File temporaryDirectory; + protected File file; + protected Random random; + + private Field field; + + @Setup + public void setup() + throws Exception + { + random = new Random(0); + temporaryDirectory = createTempDir(); + file = new File(temporaryDirectory, randomUUID().toString()); + generateData(new Path(file.getAbsolutePath()), getSchema(), generateValues(), getPrimitiveTypeName()); + } + + @TearDown + public void tearDown() + throws IOException + { + deleteRecursively(temporaryDirectory.toPath(), ALLOW_INSECURE); + } + + ParquetReader createRecordReader(boolean enableOptimizedReader) + throws IOException + { + FileParquetDataSource dataSource = new FileParquetDataSource(file); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, file.length(), Optional.empty(), false).getParquetMetadata(); + MessageType schema = parquetMetadata.getFileMetaData().getSchema(); + MessageColumnIO messageColumnIO = getColumnIO(schema, schema); + this.field = ColumnIOConverter.constructField(getType(), messageColumnIO.getChild(0)).get(); + + return new ParquetReader( + messageColumnIO, + parquetMetadata.getBlocks(), + Optional.empty(), + dataSource, + newSimpleAggregatedMemoryContext(), + new DataSize(16, MEGABYTE), + enableOptimizedReader, + false, + null, + null, + false, + Optional.empty()); + } + + protected abstract List generateValues(); + + protected abstract MessageType getSchema(); + + protected abstract String getPrimitiveTypeName(); + + protected abstract Type getType(); + + protected abstract int getPrecision(); + + protected abstract int getScale(); + } + + public static void generateData(Path outFile, MessageType schema, List dataList, String primitiveTypeName) + throws IOException + { + System.out.println("Generating data @ " + outFile); + + Configuration configuration = new Configuration(); + GroupWriteSupport.setSchema(schema, configuration); + SimpleGroupFactory f = new SimpleGroupFactory(schema); + ParquetWriter writer = new ParquetWriter( + outFile, + new GroupWriteSupport(), + CompressionCodecName.UNCOMPRESSED, + DEFAULT_BLOCK_SIZE, + DEFAULT_PAGE_SIZE, + DICT_PAGE_SIZE, + true, + false, + writerVersion, + configuration); + + for (Object data : dataList) { + if (data == null) { + writer.write(f.newGroup()); + } + else { + switch (primitiveTypeName) { + case "INT32": + writer.write(f.newGroup().append(FIELD_NAME, (int) data)); + break; + case "INT64": + writer.write(f.newGroup().append(FIELD_NAME, (long) data)); + break; + default: + writer.write(f.newGroup().append(FIELD_NAME, (Binary) data)); + } + } + } + writer.close(); + } + + static { + try { + BenchmarkDecimalColumnBatchReader benchmark = new BenchmarkDecimalColumnBatchReader(); + + ShortDecimalByteArrayLengthBenchmarkData shortDecimalByteArrayLengthBenchmarkData = new ShortDecimalByteArrayLengthBenchmarkData(); + shortDecimalByteArrayLengthBenchmarkData.setup(); + benchmark.readShortDecimalByteArrayLength(shortDecimalByteArrayLengthBenchmarkData); + + ShortDecimalBenchmarkData dataShortDecimal = new ShortDecimalBenchmarkData(); + dataShortDecimal.setup(); + benchmark.readShortDecimal(dataShortDecimal); + + LongDecimalBenchmarkData dataLongDecimal = new LongDecimalBenchmarkData(); + dataLongDecimal.setup(); + benchmark.readLongDecimal(dataLongDecimal); + } + catch (Throwable throwable) { + throw new RuntimeException(throwable); + } + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkLongDecimalColumnReader.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkLongDecimalColumnReader.java new file mode 100644 index 0000000000000..32e87327e20fa --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkLongDecimalColumnReader.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import com.facebook.presto.common.type.DecimalType; +import com.facebook.presto.parquet.PrimitiveField; +import com.facebook.presto.parquet.RichColumnDescriptor; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; +import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; + +import java.util.Random; + +import static com.facebook.presto.parquet.reader.TestData.randomBigInteger; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; + +public class BenchmarkLongDecimalColumnReader + extends AbstractColumnReaderBenchmark +{ + private static final int LENGTH = 2 * SIZE_OF_LONG; + + private final Random random = new Random(1); + + @Override + protected PrimitiveField createPrimitiveField() + { + PrimitiveType parquetType = Types.optional(FIXED_LEN_BYTE_ARRAY) + .length(LENGTH) + .as(LogicalTypeAnnotation.decimalType(0, 38)) + .named("name"); + + return new PrimitiveField( + DecimalType.createDecimalType(38), + -1, + -1, + true, + new RichColumnDescriptor(new ColumnDescriptor(new String[] {"test"}, parquetType, 0, 0), parquetType), + 0); + } + + @Override + protected ValuesWriter createValuesWriter(int bufferSize) + { + switch (parquetEncoding) { + case PLAIN: + return new FixedLenByteArrayPlainValuesWriter(LENGTH, bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + case DELTA_BYTE_ARRAY: + return new DeltaByteArrayWriter(bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + default: + throw new RuntimeException("Cannot parse parquetEncoding:" + parquetEncoding); + } + } + + @Override + protected void writeValue(ValuesWriter writer, long[] batch, int index) + { + Slice slice = Slices.wrappedLongArray(batch, index * 2, 2); + writer.writeBytes(Binary.fromConstantByteArray(slice.getBytes())); + } + + @Override + protected boolean getEnableOptimizedReader() + { + return enableOptimizedReader; + } + + @Override + protected long[] generateDataBatch(int size) + { + long[] batch = new long[size * 2]; + for (int i = 0; i < size; i++) { + Slice slice = randomBigInteger(random); + batch[i * 2] = slice.getLong(0); + batch[(i * 2) + 1] = slice.getLong(SIZE_OF_LONG); + } + return batch; + } + + public static void main(String[] args) + throws Exception + { + run(BenchmarkLongDecimalColumnReader.class); + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkShortDecimalColumnReader.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkShortDecimalColumnReader.java new file mode 100644 index 0000000000000..664f3e69fad5b --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/BenchmarkShortDecimalColumnReader.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import com.facebook.presto.common.type.DecimalType; +import com.facebook.presto.parquet.PrimitiveField; +import com.facebook.presto.parquet.RichColumnDescriptor; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; +import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.openjdk.jmh.annotations.Param; + +import static com.facebook.presto.parquet.reader.TestData.longToBytes; +import static com.facebook.presto.parquet.reader.TestData.maxPrecision; +import static com.facebook.presto.parquet.reader.TestData.unscaledRandomShortDecimalSupplier; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; + +public class BenchmarkShortDecimalColumnReader + extends AbstractColumnReaderBenchmark +{ + @Param({ + "1", "2", "3", "4", "5", "6", "7", "8", + }) + public int byteArrayLength; + + @Override + protected PrimitiveField createPrimitiveField() + { + int precision = maxPrecision(byteArrayLength); + PrimitiveType parquetType = Types.optional(FIXED_LEN_BYTE_ARRAY) + .length(byteArrayLength) + .as(LogicalTypeAnnotation.decimalType(0, precision)) + .named("name"); + return new PrimitiveField( + DecimalType.createDecimalType(precision), + -1, + -1, + true, + new RichColumnDescriptor(new ColumnDescriptor(new String[] {"test"}, parquetType, 0, 0), parquetType), + 0); + } + + @Override + protected ValuesWriter createValuesWriter(int bufferSize) + { + switch (parquetEncoding) { + case PLAIN: + return new FixedLenByteArrayPlainValuesWriter(byteArrayLength, bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + case DELTA_BYTE_ARRAY: + return new DeltaByteArrayWriter(bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + default: + throw new RuntimeException("Cannot parse parquetEncoding:" + parquetEncoding); + } + } + + @Override + protected long[] generateDataBatch(int size) + { + int precision = ((DecimalType) field.getType()).getPrecision(); + return unscaledRandomShortDecimalSupplier(byteArrayLength * Byte.SIZE, precision).apply(size); + } + + @Override + protected boolean getEnableOptimizedReader() + { + return enableOptimizedReader; + } + + @Override + protected void writeValue(ValuesWriter writer, long[] batch, int index) + { + Binary binary = Binary.fromConstantByteArray(longToBytes(batch[index], byteArrayLength)); + writer.writeBytes(binary); + } + + public static void main(String[] args) + throws Exception + { + run(BenchmarkShortDecimalColumnReader.class); + } +} diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/TestData.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/TestData.java new file mode 100644 index 0000000000000..3cfe5b0fc1688 --- /dev/null +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/reader/TestData.java @@ -0,0 +1,92 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.reader; + +import com.facebook.presto.common.type.Decimals; +import com.google.common.primitives.Longs; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; + +import java.util.Random; +import java.util.function.IntFunction; + +import static com.facebook.presto.parquet.batchreader.BytesUtils.propagateSignBit; +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static java.lang.Math.toIntExact; + +public final class TestData +{ + private TestData() {} + + // Based on org.apache.parquet.schema.Types.BasePrimitiveBuilder.maxPrecision to determine the max decimal precision supported by INT32/INT64 + public static int maxPrecision(int numBytes) + { + return toIntExact( + // convert double to long + Math.round( + // number of base-10 digits + Math.floor(Math.log10( + Math.pow(2, 8 * numBytes - 1) - 1)))); // max value stored in numBytes + } + + public static IntFunction unscaledRandomShortDecimalSupplier(int bitWidth, int precision) + { + long min = (-1 * Decimals.longTenToNth(precision)) + 1; + long max = Decimals.longTenToNth(precision) - 1; + return size -> { + long[] result = new long[size]; + for (int i = 0; i < size; i++) { + result[i] = Math.max( + Math.min(generateData(bitWidth), max), + min); + } + return result; + }; + } + + public static byte[] longToBytes(long value, int length) + { + byte[] result = new byte[length]; + for (int i = length - 1; i >= 0; i--) { + result[i] = (byte) (value & 0xFF); + value >>= Byte.SIZE; + } + return result; + } + + public static Slice randomBigInteger(Random r) + { + byte[] result = new byte[2 * SIZE_OF_LONG]; + byte[] high = randomLong(r, 0, 0x4b3b4ca85a86c47aL); + System.arraycopy(high, 0, result, 0, high.length); + byte[] low = randomLong(r, 0, 0x98a2240000000000L); + System.arraycopy(low, 0, result, 2 * SIZE_OF_LONG - high.length, low.length); + return Slices.wrappedBuffer(result); + } + + private static byte[] randomLong(Random r, long min, long max) + { + return Longs.toByteArray(r.nextLong() % (max - min) + min); + } + + public static long generateData(int bitWidth) + { + checkArgument(bitWidth <= 64 && bitWidth > 0, "bit width must be in range 1 - 64 inclusive"); + if (bitWidth == 64) { + return 10; + } + return propagateSignBit(2, 64 - bitWidth); + } +}