Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,23 @@ public void testDecimalBackedByINT64()
}
}

@Test
public void testRLEDecimalBackedByINT64()
throws Exception
{
int[] scales = {9, 9, 9, 9, 9, 9, 9, 9, 9};
for (int precision = MAX_PRECISION_INT32 + 1; precision <= MAX_PRECISION_INT64; precision++) {
int scale = scales[precision - MAX_PRECISION_INT32 - 1];
MessageType parquetSchema = parseMessageType(format("message hive_decimal { optional INT64 test (DECIMAL(%d, %d)); }", precision, scale));
ContiguousSet<Long> longValues = longsBetween(1, 1_000);
ImmutableList.Builder<SqlDecimal> expectedValues = new ImmutableList.Builder<>();
for (Long value : longValues) {
expectedValues.add(SqlDecimal.of(value, precision, scale));
}
tester.testRoundTrip(javaLongObjectInspector, longValues, expectedValues.build(), createDecimalType(precision, scale), Optional.of(parquetSchema));
}
}

private void testDecimal(int precision, int scale, Optional<MessageType> parquetSchema)
throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript
if (isTimeStampMicrosType(columnDescriptor) || isTimeMicrosType(columnDescriptor)) {
return new Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder(bitWidth, inputStream, (LongDictionary) dictionary);
}
if (isDecimalType(columnDescriptor) && isShortDecimalType(columnDescriptor)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we omit the first condition? Is there a scenario where the columnDescriptor is a ShortDecimalType but not a DecimalType?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confusing, but these technically, these check for two separate things

  • isDecimalType checks if the primitive type is DECIMAL
  • isShortDecimal actually checks the logical type annotation is decimal and then gets the precision parameter from the logical type to check if it is a short decimal

From my understanding isShortDecimal should always be true if isDecimalType returns true. However, this is the same check as in the previous block for FLOAT. I would prefer to stay consistent. Also, it is my understanding that Presto may not properly write logical type annotations yet according to #23388 -- so maybe let's just keep this now for consistency's sake?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we look deeper at isDecimalType, we will find that it ultimately also verifies that the logical type annotation is decimal. So as I understand, if a columnDescriptor is of type ShortDecimalType, it must first be of type DecimalType. Furthermore, if the primitive type's logical type annotation is null, it can neither be a DecimalType nor a ShortDecimalType, so it won't break the conclusion above (The current experiments in my local show the same behavior). And we also can see that in encoding == PLAIN clause, the checks in INT32 and INT64 are all isShortDecimalType only.

I'm OK for now to keep it as is, since I am not very sure if there are still some special scenarios present. But once it is completely checked and confirmed, I think it would be better to delete the first condition (so as to the previous block you mentioned) because it would cause a lot of confusion for future readers.

return new Int64RLEDictionaryValuesDecoder(bitWidth, inputStream, (LongDictionary) dictionary);
}
}
case DOUBLE: {
return new Int64RLEDictionaryValuesDecoder(bitWidth, inputStream, (LongDictionary) dictionary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.parquet.batchreader.decoders.rle;

import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.Int64ValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder;
import com.facebook.presto.parquet.dictionary.LongDictionary;
import org.apache.parquet.io.ParquetDecodingException;
import org.openjdk.jol.info.ClassLayout;
Expand All @@ -27,7 +28,7 @@

public class Int64RLEDictionaryValuesDecoder
extends BaseRLEBitPackedDecoder
implements Int64ValuesDecoder
implements Int64ValuesDecoder, ShortDecimalValuesDecoder
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(Int64RLEDictionaryValuesDecoder.class).instanceSize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@
import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.Int32ValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.Int64TimeAndTimestampMicrosValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.Int64ValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.TimestampValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.plain.BinaryPlainValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.plain.BooleanPlainValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.plain.Int32PlainValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.plain.Int32ShortDecimalPlainValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.plain.Int64PlainValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.plain.Int64ShortDecimalPlainValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.plain.Int64TimeAndTimestampMicrosPlainValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.plain.TimestampPlainValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.BinaryRLEDictionaryValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.BooleanRLEValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.Int32RLEDictionaryValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.Int32ShortDecimalRLEDictionaryValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.Int64RLEDictionaryValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.TimestampRLEDictionaryValuesDecoder;
Expand Down Expand Up @@ -118,6 +122,26 @@ private static BooleanValuesDecoder booleanRLE(byte[] pageBytes)
return new BooleanRLEValuesDecoder(ByteBuffer.wrap(pageBytes));
}

private static ShortDecimalValuesDecoder int32ShortDecimalPlain(byte[] pageBytes)
{
return new Int32ShortDecimalPlainValuesDecoder(pageBytes, 0, pageBytes.length);
}

private static ShortDecimalValuesDecoder int64ShortDecimalPlain(byte[] pageBytes)
{
return new Int64ShortDecimalPlainValuesDecoder(pageBytes, 0, pageBytes.length);
}

private static ShortDecimalValuesDecoder int32ShortDecimalRLE(byte[] pageBytes, int dictionarySize, IntegerDictionary dictionary)
{
return new Int32ShortDecimalRLEDictionaryValuesDecoder(getWidthFromMaxInt(dictionarySize), new ByteArrayInputStream(pageBytes), dictionary);
}

private static ShortDecimalValuesDecoder int64ShortDecimalRLE(byte[] pageBytes, int dictionarySize, LongDictionary dictionary)
{
return new Int64RLEDictionaryValuesDecoder(getWidthFromMaxInt(dictionarySize), new ByteArrayInputStream(pageBytes), dictionary);
}

private static void int32BatchReadWithSkipHelper(int batchSize, int skipSize, int valueCount, Int32ValuesDecoder decoder, List<Object> expectedValues)
throws IOException
{
Expand Down Expand Up @@ -213,6 +237,52 @@ private static void int64BatchReadWithSkipHelper(int batchSize, int skipSize, in
}
}

private static void int32ShortDecimalBatchReadWithSkipHelper(int batchSize, int skipSize, int valueCount, ShortDecimalValuesDecoder decoder, List<Object> expectedValues)
throws IOException
{
long[] actualValues = new long[valueCount];
int inputOffset = 0;
int outputOffset = 0;
while (inputOffset < valueCount) {
int readBatchSize = min(batchSize, valueCount - inputOffset);
decoder.readNext(actualValues, outputOffset, readBatchSize);

for (int i = 0; i < readBatchSize; i++) {
assertEquals(actualValues[outputOffset + i], (int) expectedValues.get(inputOffset + i));
}

inputOffset += readBatchSize;
outputOffset += readBatchSize;

int skipBatchSize = min(skipSize, valueCount - inputOffset);
decoder.skip(skipBatchSize);
inputOffset += skipBatchSize;
}
}

private static void int64ShortDecimalBatchReadWithSkipHelper(int batchSize, int skipSize, int valueCount, ShortDecimalValuesDecoder decoder, List<Object> expectedValues)
throws IOException
{
long[] actualValues = new long[valueCount];
int inputOffset = 0;
int outputOffset = 0;
while (inputOffset < valueCount) {
int readBatchSize = min(batchSize, valueCount - inputOffset);
decoder.readNext(actualValues, outputOffset, readBatchSize);

for (int i = 0; i < readBatchSize; i++) {
assertEquals(actualValues[outputOffset + i], expectedValues.get(inputOffset + i));
}

inputOffset += readBatchSize;
outputOffset += readBatchSize;

int skipBatchSize = min(skipSize, valueCount - inputOffset);
decoder.skip(skipBatchSize);
inputOffset += skipBatchSize;
}
}

private static void timestampBatchReadWithSkipHelper(int batchSize, int skipSize, int valueCount, TimestampValuesDecoder decoder, List<Object> expectedValues)
throws IOException
{
Expand Down Expand Up @@ -515,4 +585,100 @@ public void testBooleanRLE()
booleanBatchReadWithSkipHelper(89, 29, valueCount, booleanRLE(dataPage), expectedValues);
booleanBatchReadWithSkipHelper(1024, 1024, valueCount, booleanRLE(dataPage), expectedValues);
}

@Test
public void testInt32ShortDecimalPlain()
throws IOException
{
int valueCount = 2048;
List<Object> expectedValues = new ArrayList<>();

byte[] pageBytes = generatePlainValuesPage(valueCount, 32, new Random(83), expectedValues);
int32ShortDecimalBatchReadWithSkipHelper(valueCount, 0, valueCount, int32ShortDecimalPlain(pageBytes), expectedValues); // read all values in one batch
int32ShortDecimalBatchReadWithSkipHelper(29, 0, valueCount, int32ShortDecimalPlain(pageBytes), expectedValues);
int32ShortDecimalBatchReadWithSkipHelper(89, 0, valueCount, int32ShortDecimalPlain(pageBytes), expectedValues);
int32ShortDecimalBatchReadWithSkipHelper(1024, 0, valueCount, int32ShortDecimalPlain(pageBytes), expectedValues);

int32ShortDecimalBatchReadWithSkipHelper(256, 29, valueCount, int32ShortDecimalPlain(pageBytes), expectedValues);
int32ShortDecimalBatchReadWithSkipHelper(89, 29, valueCount, int32ShortDecimalPlain(pageBytes), expectedValues);
int32ShortDecimalBatchReadWithSkipHelper(1024, 1024, valueCount, int32ShortDecimalPlain(pageBytes), expectedValues);
}

@Test
public void testInt64ShortDecimalPlain()
throws IOException
{
int valueCount = 2048;
List<Object> expectedValues = new ArrayList<>();

byte[] pageBytes = generatePlainValuesPage(valueCount, 64, new Random(83), expectedValues);
int64ShortDecimalBatchReadWithSkipHelper(valueCount, 0, valueCount, int64ShortDecimalPlain(pageBytes), expectedValues); // read all values in one batch
int64ShortDecimalBatchReadWithSkipHelper(29, 0, valueCount, int64ShortDecimalPlain(pageBytes), expectedValues);
int64ShortDecimalBatchReadWithSkipHelper(89, 0, valueCount, int64ShortDecimalPlain(pageBytes), expectedValues);
int64ShortDecimalBatchReadWithSkipHelper(1024, 0, valueCount, int64ShortDecimalPlain(pageBytes), expectedValues);

int64ShortDecimalBatchReadWithSkipHelper(256, 29, valueCount, int64ShortDecimalPlain(pageBytes), expectedValues);
int64ShortDecimalBatchReadWithSkipHelper(89, 29, valueCount, int64ShortDecimalPlain(pageBytes), expectedValues);
int64ShortDecimalBatchReadWithSkipHelper(1024, 1024, valueCount, int64ShortDecimalPlain(pageBytes), expectedValues);
}

@Test
public void testInt32ShortDecimalRLE()
throws IOException
{
Random random = new Random(83);
int valueCount = 2048;
int dictionarySize = 29;
List<Object> dictionary = new ArrayList<>();
List<Integer> dictionaryIds = new ArrayList<>();

byte[] dictionaryPage = generatePlainValuesPage(dictionarySize, 32, random, dictionary);
byte[] dataPage = generateDictionaryIdPage2048(dictionarySize - 1, random, dictionaryIds);

List<Object> expectedValues = new ArrayList<>();
for (Integer dictionaryId : dictionaryIds) {
expectedValues.add(dictionary.get(dictionaryId));
}

IntegerDictionary integerDictionary = new IntegerDictionary(new DictionaryPage(Slices.wrappedBuffer(dictionaryPage), dictionarySize, PLAIN_DICTIONARY));

int32ShortDecimalBatchReadWithSkipHelper(valueCount, 0, valueCount, int32ShortDecimalRLE(dataPage, dictionarySize, integerDictionary), expectedValues); // read all values in one batch
int32ShortDecimalBatchReadWithSkipHelper(29, 0, valueCount, int32ShortDecimalRLE(dataPage, dictionarySize, integerDictionary), expectedValues);
int32ShortDecimalBatchReadWithSkipHelper(89, 0, valueCount, int32ShortDecimalRLE(dataPage, dictionarySize, integerDictionary), expectedValues);
int32ShortDecimalBatchReadWithSkipHelper(1024, 0, valueCount, int32ShortDecimalRLE(dataPage, dictionarySize, integerDictionary), expectedValues);

int32ShortDecimalBatchReadWithSkipHelper(256, 29, valueCount, int32ShortDecimalRLE(dataPage, dictionarySize, integerDictionary), expectedValues);
int32ShortDecimalBatchReadWithSkipHelper(89, 29, valueCount, int32ShortDecimalRLE(dataPage, dictionarySize, integerDictionary), expectedValues);
int32ShortDecimalBatchReadWithSkipHelper(1024, 1024, valueCount, int32ShortDecimalRLE(dataPage, dictionarySize, integerDictionary), expectedValues);
}

@Test
public void testInt64ShortDecimalRLE()
throws IOException
{
Random random = new Random(83);
int valueCount = 2048;
int dictionarySize = 29;
List<Object> dictionary = new ArrayList<>();
List<Integer> dictionaryIds = new ArrayList<>();

byte[] dictionaryPage = generatePlainValuesPage(dictionarySize, 64, random, dictionary);
byte[] dataPage = generateDictionaryIdPage2048(dictionarySize - 1, random, dictionaryIds);

List<Object> expectedValues = new ArrayList<>();
for (Integer dictionaryId : dictionaryIds) {
expectedValues.add(dictionary.get(dictionaryId));
}

LongDictionary longDictionary = new LongDictionary(new DictionaryPage(Slices.wrappedBuffer(dictionaryPage), dictionarySize, PLAIN_DICTIONARY));

int64ShortDecimalBatchReadWithSkipHelper(valueCount, 0, valueCount, int64ShortDecimalRLE(dataPage, dictionarySize, longDictionary), expectedValues); // read all values in one batch
int64ShortDecimalBatchReadWithSkipHelper(29, 0, valueCount, int64ShortDecimalRLE(dataPage, dictionarySize, longDictionary), expectedValues);
int64ShortDecimalBatchReadWithSkipHelper(89, 0, valueCount, int64ShortDecimalRLE(dataPage, dictionarySize, longDictionary), expectedValues);
int64ShortDecimalBatchReadWithSkipHelper(1024, 0, valueCount, int64ShortDecimalRLE(dataPage, dictionarySize, longDictionary), expectedValues);

int64ShortDecimalBatchReadWithSkipHelper(256, 29, valueCount, int64ShortDecimalRLE(dataPage, dictionarySize, longDictionary), expectedValues);
int64ShortDecimalBatchReadWithSkipHelper(89, 29, valueCount, int64ShortDecimalRLE(dataPage, dictionarySize, longDictionary), expectedValues);
int64ShortDecimalBatchReadWithSkipHelper(1024, 1024, valueCount, int64ShortDecimalRLE(dataPage, dictionarySize, longDictionary), expectedValues);
}
}