Add support for decimal batch reader#22636
Conversation
|
|
b0972c0 to
96f2271
Compare
| switch (length) { | ||
| case 8: | ||
| value |= bytes[startOffset + 7] & 0xFFL; | ||
| // fall through |
There was a problem hiding this comment.
this line comment seems not useful
same for following lines
| } | ||
|
|
||
| DecimalLogicalTypeAnnotation decimalLogicalTypeAnnotation = (DecimalLogicalTypeAnnotation) logicalTypeAnnotation; | ||
| return decimalLogicalTypeAnnotation.getPrecision() <= Decimals.MAX_SHORT_PRECISION; |
There was a problem hiding this comment.
static import Decimals.MAX_SHORT_PRECISION
| public class ShortDecimalFixedWidthByteArrayBatchDecoder | ||
| { | ||
| private static final ShortDecimalDecoder[] VALUE_DECODERS = new ShortDecimalDecoder[] { | ||
| new BigEndianReader1(), |
There was a problem hiding this comment.
why do we need 7 readers? add a comment?
There was a problem hiding this comment.
This actually further optimizes the reading speed of short decimals. The implementation of ShortDecimalFixedWidthByteArrayBatchDecoder actually refers to the implementation of Trino: trinodb/trino@f71a815
| import static com.facebook.presto.parquet.ParquetTypeUtils.getShortDecimalValue; | ||
| import static com.google.common.base.Preconditions.checkArgument; | ||
|
|
||
| public class BinaryShortDecimalDeltaValuesDecoder |
There was a problem hiding this comment.
shall we have an abstract class, where BinaryLongDecimalDeltaValuesDecoder and BinaryShortDecimalDeltaValuesDecoder are its subclasses?
The code of BinaryLongDecimalDeltaValuesDecoder and BinaryShortDecimalDeltaValuesDecoder has lots of same code
There was a problem hiding this comment.
I'll see how to refactor these two classes because they implement different interfaces.
| import static com.google.common.base.Preconditions.checkArgument; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class Int64ShortDecimalDeltaValuesDecoder |
There was a problem hiding this comment.
shall we have an abstract class, and Int32ShortDecimalDeltaValuesDecoder, Int64ShortDecimalDeltaValuesDecoder becomes its sub-classes? The classes share many same code
There was a problem hiding this comment.
The common parts of Int32ShortDecimalDeltaValuesDecoder and Int64ShortDecimalDeltaValuesDecoder have been extracted into AbstractInt64AndInt32ShortDecimalDeltaValuesDecoder.
| import static io.airlift.slice.SizeOf.SIZE_OF_LONG; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class LongDecimalApacheParquetValueDecoder |
There was a problem hiding this comment.
bad naming. LongDecimalApacheParquetValueDecoder why Apache and Parquet appears in the name?
There was a problem hiding this comment.
I have renamed LongDecimalApacheParquetValueDecoder and ShortDecimalApacheParquetValueDecoder to FixedLenByteArrayShortDecimalDeltaValueDecoder and FixedLenByteArrayLongDecimalDeltaValueDecoder respectively.
| import static com.google.common.base.Preconditions.checkArgument; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class ShortDecimalApacheParquetValueDecoder |
presto-common/src/main/java/com/facebook/presto/common/type/UnscaledDecimal128Arithmetic.java
Outdated
Show resolved
Hide resolved
| return decimalLogicalTypeAnnotation.getPrecision() <= Decimals.MAX_SHORT_PRECISION; | ||
| } | ||
|
|
||
| public static boolean isLongDecimalType(ColumnDescriptor descriptor) |
presto-parquet/src/test/java/com/facebook/presto/parquet/BenchmarkParquetReader.java
Outdated
Show resolved
Hide resolved
presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java
Outdated
Show resolved
Hide resolved
presto-parquet/src/main/java/com/facebook/presto/parquet/ColumnReaderFactory.java
Show resolved
Hide resolved
| int inputBytesOffset = input.getByteArrayOffset(); | ||
| for (int i = offset; i < offset + length; i++) { | ||
| checkBytesFitInShortDecimal(inputBytes, inputBytesOffset, extraBytesLength, columnDescriptor); | ||
| values[i] = getShortDecimalValue(inputBytes, inputBytesOffset + extraBytesLength, Long.BYTES); |
There was a problem hiding this comment.
What if extraBytesLength < 0?
There was a problem hiding this comment.
According to the implementation of the code, extraBytesLength will not be less than 0, but must be greater than 0.
if (typeLength <= Long.BYTES) {
....
}
int extraBytesLength = typeLength - Long.BYTES;
| // Equivalent to expectedValue = bytes[endOffset] < 0 ? -1 : 0 | ||
| byte expectedValue = (byte) (bytes[endOffset] >> 7); | ||
| for (int i = offset; i < endOffset; i++) { | ||
| if (bytes[i] != expectedValue) { |
There was a problem hiding this comment.
Could you explain how this works? Suppose inputBytesOffset = 0 and typeLength=9 here, then your extraBytesLength = 1, and checkBytesFitInShortDecimal(inputBytes, 0, 1, descriptor) is called. And your expectedValue is to check if inputBytes[1] < 0 ? -1 : 0. Since the values are encoded big-endian byte order, I assume you wanted to check the the most significant byte which should be inputBytes[0], but you're checking the second most significant byte inputBytes[1]. Does that work?
Also The largest precision for short decimal is 18 and the value is 999,999,999,999,999,999. It can be expressed with 60 bits value 0xDE0B6B3A763FFFF. If you really need to verify there is no overflow, the bits 61-64 also need to be checked. I don't see it's done here. Could you explain a little bit your idea?
There was a problem hiding this comment.
The bytes[endOffset] of the above code is actually the most significant byte. To illustrate this better, I built a test locally. The original data is 123456789012.12345678, typeLength=14. the contents of bytes are [2, 0, 0, 0, 3, 1, 0, 0, 0, 0, 0, 0, -85, 84, -87, -116, -23, -53, -11, 78]
NOTE: -85, 84, -87, -116, -23, -53, -11, 78 converted to binary is actually the high 56-bit data of 12345678901212345678 converted to binary.
-85, 84, -87, -116, -23, -53, -11, 78 binary representation:
10101011010101001010100110001100111010011100101111110101
12345678901212345678 binary representation:
1010101101010100101010011000110011101001110010111111010101001110
In this scenario, inputBytesOffset = 6, extraBytesLength = 6, so endOffset = 12, bytes[endOffset] = -85, which is actually the most significant byte. Then we check whether bytes[6.. 11] is -1. Attached is the file I tested.
86216465-ad5d-4acc-bc8b-0d1972149d8c.tgz
6a1eda5 to
49c8f19
Compare
|
Codenotify: Notifying subscribers in CODENOTIFY files for diff 9898886...cabc10c. No notifications. |
elharo
left a comment
There was a problem hiding this comment.
This PR seems short on new tests. I would have expected quite a number given all the new code.
presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java
Outdated
Show resolved
Hide resolved
|
|
||
| for (int i = 0; i < bytes.length; i++) { | ||
| value |= ((long) bytes[bytes.length - i - 1] & 0xFFL) << (8 * i); | ||
| public static long getShortDecimalValue(byte[] bytes, int startOffset, int length) |
There was a problem hiding this comment.
can this be private or not public?
There was a problem hiding this comment.
No, this method is used in many places.
| return data; | ||
| } | ||
|
|
||
| private static int propagateSignBit(int value, int bitsToPad) |
There was a problem hiding this comment.
I looked at the code and found that this function can actually be deleted.
| out[7 + outPos] = (byte) (inByte >> 7 & 1); | ||
| } | ||
|
|
||
| public static long propagateSignBit(long value, int bitsToPad) |
There was a problem hiding this comment.
This method could use a unit test that addresses it directly
| public SimpleSliceInputStream(Slice slice, int offset) | ||
| { | ||
| this.slice = requireNonNull(slice, "slice is null"); | ||
| checkArgument(slice.length() == 0 || slice.hasByteArray(), "SimpleSliceInputStream supports only slices backed by byte array"); |
|
@zhenxiao @yingsu00 @elharo sorry for the late reply. I have made modifications according to the previous review comments.
I generated Parquet files with different encodings locally. I will see how to add them to the test set. |
|
Nit, suggest release note entry change: |
153aa02 to
c182a88
Compare
|
@zhenxiao @yingsu00 I added three new tests in |
7fa98f4 to
fd3a640
Compare
elharo
left a comment
There was a problem hiding this comment.
There's a lot of new public API here that feels like it needs unit tests, e.g. SimpleSliceInputStream
|
HI @tdcmeehan @elharo Thank you for your review, I'll add some test cases soon. |
| if ((encoding == DELTA_BYTE_ARRAY || encoding == DELTA_LENGTH_BYTE_ARRAY) && type == PrimitiveTypeName.BINARY) { | ||
| ByteBufferInputStream inputStream = ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, offset, length)); | ||
|
|
||
| Optional<Type> prestoType = createDecimalType(columnDescriptor); |
There was a problem hiding this comment.
I know this is from existing code, but it is confusing to detect if the logical type is decimal by calling createDecimalType() and check its returned type. It'll be clearer to make the type detecting consistent with other encodings, e.g. as what you did on line 195, 196
There was a problem hiding this comment.
Already refactored, thanks.
|
|
||
| public FixedLenByteArrayShortDecimalPlainValuesDecoder(ColumnDescriptor columnDescriptor, byte[] byteBuffer, int bufferOffset, int length) | ||
| { | ||
| this.columnDescriptor = columnDescriptor; |
There was a problem hiding this comment.
I know the existing decoders code has the same pattern, but can you please add requireNonNull() on columnDescriptor and byteBuffer? Same for all other constructors for the classes you add. Thanks!
|
HI @elharo @tdcmeehan @yingsu00 thanks for your inputs and sorry for the delay. I modified some code according to the previous review and added a new test case |
| throws Exception | ||
| { | ||
| for (int precision = 1; precision <= MAX_SHORT_PRECISION; precision++) { | ||
| int scale = ThreadLocalRandom.current().nextInt(precision); |
There was a problem hiding this comment.
I get very nervous seeing random in tests. If something fails it's not reproducible, Use fixed constant test data instead, even if the fixed values are initially randomly chosen.
| throws Exception | ||
| { | ||
| for (int precision = 1; precision <= MAX_SHORT_PRECISION; precision++) { | ||
| int scale = ThreadLocalRandom.current().nextInt(precision); |
| if (!isNested && isShortDecimalType(descriptor)) { | ||
| int precision = ((DecimalLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation()).getPrecision(); | ||
| if (precision < 10) { | ||
| log.warn("PrimitiveTypeName is INT64 but precision is less then 10."); |
There was a problem hiding this comment.
delete the warning; it's not actionable
or if I'm wrong and this is a real problem, then a waring isn't enough. This should fail outright
There was a problem hiding this comment.
The spec says we need to produce a warning when precision < 10 for INT64. See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
| * is not a common one, just use the existing one provided by Parquet library and add a wrapper around it that satisfies the | ||
| * {@link ShortDecimalValuesDecoder} interface. | ||
| */ | ||
| public class FixedLenByteArrayShortDecimalDeltaValueDecoder |
There was a problem hiding this comment.
avoid abbreviations; thus FixedLen --> FixedLength
but aren't all arrays in java fixed length? So maybe just ByteArrayShortDecimalDeltaValueDecoder
There was a problem hiding this comment.
The naming convention of the Decoder class name here is ParquetPrimitiveType + [Short|Long]Decimal + encoding + ValuesDecoder. The PrimitiveTypeName corresponding to this class is FIXED_LEN_BYTE_ARRAY, so this name is used.
| int positionOffset = offsets[i]; | ||
| int positionLength = offsets[i + 1] - positionOffset; | ||
| byte[] temp = new byte[positionLength]; | ||
| System.arraycopy(byteBuffer, positionOffset, temp, 0, positionLength); |
There was a problem hiding this comment.
Personally I find Arrays.copyOf a little easier to read; up to you
There was a problem hiding this comment.
I changed it to byte[] temp = Arrays.copyOfRange(byteBuffer, positionOffset, positionOffset + positionLength);
| @Override | ||
| public void readNext(long[] values, int offset, int length) | ||
| { | ||
| final byte[] localByteBuffer = byteBuffer; |
There was a problem hiding this comment.
Danger! Since this local variable is just a reference, byteBuffer is modified anyway when localByteBuffer is. Code below might be correct, I'm not sure, but this variable should be removed.
| byte expectedValue = (byte) (bytes[endOffset] >> 7); | ||
| for (int i = offset; i < endOffset; i++) { | ||
| if (bytes[i] != expectedValue) { | ||
| throw new PrestoException(NOT_SUPPORTED, format( |
There was a problem hiding this comment.
string concatenation is simpler here
|
|
||
| // We first shift the byte as left as possible. Then, when shifting back right, | ||
| // the sign bit will get propagated | ||
| values[offset] = value << 56 >> 56; |
There was a problem hiding this comment.
Order of operations is foggy. Please use parentheses to make this explicit.
| long[] result = new long[size]; | ||
| for (int i = 0; i < size; i++) { | ||
| result[i] = Math.max( | ||
| Math.min(randomLong(random, bitWidth), max), |
There was a problem hiding this comment.
avoid random numbers in tests. Test results need to be reproducible.
|
Thank you @elharo for the review! I have updated with a new (squashed) commit, and I have addressed all your comments. Let me know if I missed anything. |
|
@wypb There is a test failure: Failures: Can you please investigate? THanks! |
|
@yingsu00 this is actually likely due to facebookincubator/velox#10261 and is being backed out in facebookincubator/velox#10431. It was added in #23138 (which appeared to have been merged in spite of the failure it introduced). See: #23156 |
|
Hi @tdcmeehan thank you for sharing the information. @yingsu00 I synchronized the latest code, and now CI is all green. |
Thank you @tdcmeehan for letting me know! |
Description
Add support for decimal batch reader
Benchmark(The lower the better)
Impact
When we enable Parquet batch reader(
parquet_batch_read_optimization_enabled=true), the Decimal type will read data in Batch mode.Test Plan
Contributor checklist
Release Notes
CC: @zhenxiao