-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Add support for decimal batch reader #22636
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,8 +13,7 @@ | |
| */ | ||
| package com.facebook.presto.parquet; | ||
|
|
||
| import com.facebook.presto.common.type.DecimalType; | ||
| import com.facebook.presto.common.type.Type; | ||
| import com.facebook.airlift.log.Logger; | ||
| import com.facebook.presto.parquet.batchreader.BinaryFlatBatchReader; | ||
| import com.facebook.presto.parquet.batchreader.BinaryNestedBatchReader; | ||
| import com.facebook.presto.parquet.batchreader.BooleanFlatBatchReader; | ||
|
|
@@ -25,6 +24,8 @@ | |
| import com.facebook.presto.parquet.batchreader.Int64NestedBatchReader; | ||
| import com.facebook.presto.parquet.batchreader.Int64TimestampMicrosFlatBatchReader; | ||
| import com.facebook.presto.parquet.batchreader.Int64TimestampMicrosNestedBatchReader; | ||
| import com.facebook.presto.parquet.batchreader.LongDecimalFlatBatchReader; | ||
| import com.facebook.presto.parquet.batchreader.ShortDecimalFlatBatchReader; | ||
| import com.facebook.presto.parquet.batchreader.TimestampFlatBatchReader; | ||
| import com.facebook.presto.parquet.batchreader.TimestampNestedBatchReader; | ||
| import com.facebook.presto.parquet.reader.AbstractColumnReader; | ||
|
|
@@ -40,43 +41,67 @@ | |
| import com.facebook.presto.parquet.reader.ShortDecimalColumnReader; | ||
| import com.facebook.presto.parquet.reader.TimestampColumnReader; | ||
| import com.facebook.presto.spi.PrestoException; | ||
| import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; | ||
|
|
||
| import java.util.Optional; | ||
|
|
||
| import static com.facebook.presto.parquet.ParquetTypeUtils.createDecimalType; | ||
| import static com.facebook.presto.parquet.ParquetTypeUtils.isDecimalType; | ||
| import static com.facebook.presto.parquet.ParquetTypeUtils.isShortDecimalType; | ||
| import static com.facebook.presto.parquet.ParquetTypeUtils.isTimeStampMicrosType; | ||
| import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; | ||
| import static org.apache.parquet.schema.OriginalType.DECIMAL; | ||
| import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS; | ||
| import static org.apache.parquet.schema.OriginalType.TIME_MICROS; | ||
|
|
||
| public class ColumnReaderFactory | ||
| { | ||
| private static final Logger log = Logger.get(ColumnReaderFactory.class); | ||
| private ColumnReaderFactory() | ||
| { | ||
| } | ||
|
|
||
| public static ColumnReader createReader(RichColumnDescriptor descriptor, boolean batchReadEnabled) | ||
| { | ||
| // decimal is not supported in batch readers | ||
| if (batchReadEnabled && descriptor.getPrimitiveType().getOriginalType() != DECIMAL) { | ||
| if (batchReadEnabled) { | ||
| final boolean isNested = descriptor.getPath().length > 1; | ||
| switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) { | ||
| case BOOLEAN: | ||
| return isNested ? new BooleanNestedBatchReader(descriptor) : new BooleanFlatBatchReader(descriptor); | ||
| case INT32: | ||
| if (!isNested && isShortDecimalType(descriptor)) { | ||
| return new ShortDecimalFlatBatchReader(descriptor); | ||
| } | ||
| case FLOAT: | ||
| return isNested ? new Int32NestedBatchReader(descriptor) : new Int32FlatBatchReader(descriptor); | ||
| case INT64: | ||
| if (isTimeStampMicrosType(descriptor)) { | ||
| return isNested ? new Int64TimestampMicrosNestedBatchReader(descriptor) : new Int64TimestampMicrosFlatBatchReader(descriptor); | ||
| } | ||
|
|
||
| if (!isNested && isShortDecimalType(descriptor)) { | ||
| int precision = ((DecimalLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation()).getPrecision(); | ||
| if (precision < 10) { | ||
| log.warn("PrimitiveTypeName is INT64 but precision is less then 10."); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. delete the warning; it's not actionable or if I'm wrong and this is a real problem, then a waring isn't enough. This should fail outright
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The spec says we need to produce a warning when precision < 10 for INT64. See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal |
||
| } | ||
| return new ShortDecimalFlatBatchReader(descriptor); | ||
| } | ||
| case DOUBLE: | ||
| return isNested ? new Int64NestedBatchReader(descriptor) : new Int64FlatBatchReader(descriptor); | ||
| case INT96: | ||
| return isNested ? new TimestampNestedBatchReader(descriptor) : new TimestampFlatBatchReader(descriptor); | ||
| case BINARY: | ||
| Optional<ColumnReader> decimalBatchColumnReader = createDecimalBatchColumnReader(descriptor); | ||
| if (decimalBatchColumnReader.isPresent()) { | ||
| return decimalBatchColumnReader.get(); | ||
| } | ||
|
|
||
| return isNested ? new BinaryNestedBatchReader(descriptor) : new BinaryFlatBatchReader(descriptor); | ||
| case FIXED_LEN_BYTE_ARRAY: | ||
| if (!isNested) { | ||
| decimalBatchColumnReader = createDecimalBatchColumnReader(descriptor); | ||
| if (decimalBatchColumnReader.isPresent()) { | ||
| return decimalBatchColumnReader.get(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -109,17 +134,24 @@ public static ColumnReader createReader(RichColumnDescriptor descriptor, boolean | |
| } | ||
| } | ||
|
|
||
| private static Optional<ColumnReader> createDecimalBatchColumnReader(RichColumnDescriptor descriptor) | ||
| { | ||
| if (isDecimalType(descriptor)) { | ||
| if (isShortDecimalType(descriptor)) { | ||
| return Optional.of(new ShortDecimalFlatBatchReader(descriptor)); | ||
| } | ||
| return Optional.of(new LongDecimalFlatBatchReader(descriptor)); | ||
| } | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| private static Optional<AbstractColumnReader> createDecimalColumnReader(RichColumnDescriptor descriptor) | ||
| { | ||
| Optional<Type> type = createDecimalType(descriptor); | ||
| if (type.isPresent()) { | ||
| DecimalType decimalType = (DecimalType) type.get(); | ||
| if (decimalType.isShort()) { | ||
| if (isDecimalType(descriptor)) { | ||
| if (isShortDecimalType(descriptor)) { | ||
| return Optional.of(new ShortDecimalColumnReader(descriptor)); | ||
| } | ||
| else { | ||
| return Optional.of(new LongDecimalColumnReader(descriptor)); | ||
| } | ||
| return Optional.of(new LongDecimalColumnReader(descriptor)); | ||
| } | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,8 +14,6 @@ | |
| package com.facebook.presto.parquet; | ||
|
|
||
| import com.facebook.presto.common.Subfield; | ||
| import com.facebook.presto.common.type.DecimalType; | ||
| import com.facebook.presto.common.type.Type; | ||
| import com.google.common.collect.ImmutableList; | ||
| import org.apache.parquet.column.ColumnDescriptor; | ||
| import org.apache.parquet.column.Encoding; | ||
|
|
@@ -26,8 +24,9 @@ | |
| import org.apache.parquet.io.MessageColumnIO; | ||
| import org.apache.parquet.io.ParquetDecodingException; | ||
| import org.apache.parquet.io.PrimitiveColumnIO; | ||
| import org.apache.parquet.schema.DecimalMetadata; | ||
| import org.apache.parquet.schema.GroupType; | ||
| import org.apache.parquet.schema.LogicalTypeAnnotation; | ||
| import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; | ||
| import org.apache.parquet.schema.MessageType; | ||
|
|
||
| import java.util.Arrays; | ||
|
|
@@ -37,6 +36,7 @@ | |
| import java.util.Map; | ||
| import java.util.Optional; | ||
|
|
||
| import static com.facebook.presto.common.type.Decimals.MAX_SHORT_PRECISION; | ||
| import static com.google.common.base.Preconditions.checkArgument; | ||
| import static com.google.common.collect.Iterables.getOnlyElement; | ||
| import static java.util.stream.Collectors.joining; | ||
|
|
@@ -229,19 +229,6 @@ public static ColumnIO lookupColumnByName(GroupColumnIO groupColumnIO, String co | |
| return null; | ||
| } | ||
|
|
||
| public static Optional<Type> createDecimalType(RichColumnDescriptor descriptor) | ||
| { | ||
| if (descriptor.getPrimitiveType().getOriginalType() != DECIMAL) { | ||
| return Optional.empty(); | ||
| } | ||
| return Optional.of(createDecimalType(descriptor.getPrimitiveType().getDecimalMetadata())); | ||
| } | ||
|
|
||
| private static Type createDecimalType(DecimalMetadata decimalMetadata) | ||
| { | ||
| return DecimalType.createDecimalType(decimalMetadata.getPrecision(), decimalMetadata.getScale()); | ||
| } | ||
|
|
||
| /** | ||
| * For optional fields: | ||
| * definitionLevel == maxDefinitionLevel => Value is defined | ||
|
|
@@ -253,20 +240,33 @@ public static boolean isValueNull(boolean required, int definitionLevel, int max | |
| return !required && (definitionLevel == maxDefinitionLevel - 1); | ||
| } | ||
|
|
||
| // copied from presto-hive DecimalUtils | ||
| public static long getShortDecimalValue(byte[] bytes) | ||
| { | ||
| long value = 0; | ||
| if ((bytes[0] & 0x80) != 0) { | ||
| for (int i = 0; i < 8 - bytes.length; ++i) { | ||
| value |= 0xFFL << (8 * (7 - i)); | ||
| } | ||
| } | ||
| return getShortDecimalValue(bytes, 0, bytes.length); | ||
| } | ||
|
|
||
| for (int i = 0; i < bytes.length; i++) { | ||
| value |= ((long) bytes[bytes.length - i - 1] & 0xFFL) << (8 * i); | ||
| public static long getShortDecimalValue(byte[] bytes, int startOffset, int length) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this be private or not public?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, this method is used in many places. |
||
| { | ||
| long value = 0; | ||
| switch (length) { | ||
| case 8: | ||
| value |= bytes[startOffset + 7] & 0xFFL; | ||
| case 7: | ||
| value |= (bytes[startOffset + 6] & 0xFFL) << 8; | ||
| case 6: | ||
| value |= (bytes[startOffset + 5] & 0xFFL) << 16; | ||
| case 5: | ||
| value |= (bytes[startOffset + 4] & 0xFFL) << 24; | ||
| case 4: | ||
| value |= (bytes[startOffset + 3] & 0xFFL) << 32; | ||
| case 3: | ||
| value |= (bytes[startOffset + 2] & 0xFFL) << 40; | ||
| case 2: | ||
| value |= (bytes[startOffset + 1] & 0xFFL) << 48; | ||
| case 1: | ||
| value |= (bytes[startOffset] & 0xFFL) << 56; | ||
| } | ||
|
|
||
| value = value >> ((8 - length) * 8); | ||
| return value; | ||
| } | ||
|
|
||
|
|
@@ -335,4 +335,20 @@ public static boolean isTimeStampMicrosType(ColumnDescriptor descriptor) | |
| { | ||
| return TIMESTAMP_MICROS.equals(descriptor.getPrimitiveType().getOriginalType()); | ||
| } | ||
|
|
||
| public static boolean isShortDecimalType(ColumnDescriptor descriptor) | ||
| { | ||
| LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); | ||
| if (!(logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation)) { | ||
| return false; | ||
| } | ||
|
|
||
| DecimalLogicalTypeAnnotation decimalLogicalTypeAnnotation = (DecimalLogicalTypeAnnotation) logicalTypeAnnotation; | ||
| return decimalLogicalTypeAnnotation.getPrecision() <= MAX_SHORT_PRECISION; | ||
| } | ||
|
|
||
| public static boolean isDecimalType(ColumnDescriptor columnDescriptor) | ||
| { | ||
| return columnDescriptor.getPrimitiveType().getOriginalType() == DECIMAL; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,4 +61,9 @@ public static void unpack8Values(byte inByte, byte[] out, int outPos) | |
| out[6 + outPos] = (byte) (inByte >> 6 & 1); | ||
| out[7 + outPos] = (byte) (inByte >> 7 & 1); | ||
| } | ||
|
|
||
| public static long propagateSignBit(long value, int bitsToPad) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method could use a unit test that addresses it directly |
||
| { | ||
| return value << bitsToPad >> bitsToPad; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.