From 614a91d369b3df7e33e7c726ce9e8805f5dd5647 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Wed, 23 Aug 2023 09:32:21 +0530 Subject: [PATCH 1/3] Remove usage of Binary for reading parquet dictionary --- .../parquet/dictionary/BinaryDictionary.java | 27 ++++++---------- .../trino/parquet/dictionary/Dictionary.java | 4 +-- .../parquet/dictionary/DictionaryReader.java | 2 +- .../TupleDomainParquetPredicate.java | 32 +++++++++++-------- 4 files changed, 31 insertions(+), 34 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/BinaryDictionary.java b/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/BinaryDictionary.java index 258537396293..4a0f20257b1c 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/BinaryDictionary.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/BinaryDictionary.java @@ -15,53 +15,46 @@ import io.airlift.slice.Slice; import io.trino.parquet.DictionaryPage; -import org.apache.parquet.io.api.Binary; - -import java.io.IOException; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian; public class BinaryDictionary implements Dictionary { - private final Binary[] content; + private final Slice[] content; public BinaryDictionary(DictionaryPage dictionaryPage) - throws IOException { this(dictionaryPage, null); } public BinaryDictionary(DictionaryPage dictionaryPage, Integer length) - throws IOException { - content = new Binary[dictionaryPage.getDictionarySize()]; + content = new Slice[dictionaryPage.getDictionarySize()]; Slice dictionarySlice = dictionaryPage.getSlice(); - byte[] dictionaryBytes = dictionarySlice.byteArray(); - int offset = dictionarySlice.byteArrayOffset(); + int currentInputOffset = 0; if (length == null) { for (int i = 0; i < content.length; i++) { - int len = readIntLittleEndian(dictionaryBytes, offset); - offset += 4; - content[i] = Binary.fromReusedByteArray(dictionaryBytes, offset, len); - offset += len; + int positionLength = dictionarySlice.getInt(currentInputOffset); + currentInputOffset += Integer.BYTES; + content[i] = dictionarySlice.slice(currentInputOffset, positionLength); + currentInputOffset += positionLength; } } else { checkArgument(length > 0, "Invalid byte array length: %s", length); for (int i = 0; i < content.length; i++) { - content[i] = Binary.fromReusedByteArray(dictionaryBytes, offset, length); - offset += length; + content[i] = dictionarySlice.slice(currentInputOffset, length); + currentInputOffset += length; } } } @Override - public Binary decodeToBinary(int id) + public Slice decodeToSlice(int id) { return content[id]; } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/Dictionary.java b/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/Dictionary.java index 63d6dcb68ca3..29720538e62e 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/Dictionary.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/Dictionary.java @@ -13,11 +13,11 @@ */ package io.trino.parquet.dictionary; -import org.apache.parquet.io.api.Binary; +import io.airlift.slice.Slice; public interface Dictionary { - default Binary decodeToBinary(int id) + default Slice decodeToSlice(int id) { throw new UnsupportedOperationException(); } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/DictionaryReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/DictionaryReader.java index dd727bd604d1..f670de476f54 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/DictionaryReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/DictionaryReader.java @@ -50,7 +50,7 @@ public int readValueDictionaryId() @Override public Binary readBytes() { - return dictionary.decodeToBinary(readInt()); + return Binary.fromConstantByteArray(dictionary.decodeToSlice(readInt()).getBytes()); } @Override diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java index f001df7bee85..cdd279ae1d16 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java @@ -307,11 +307,13 @@ public static Domain getDomain( } try { + Object min = statistics.genericGetMin(); + Object max = statistics.genericGetMax(); return getDomain( column, type, - ImmutableList.of(statistics.genericGetMin()), - ImmutableList.of(statistics.genericGetMax()), + ImmutableList.of(min instanceof Binary ? Slices.wrappedBuffer(((Binary) min).getBytes()) : min), + ImmutableList.of(max instanceof Binary ? Slices.wrappedBuffer(((Binary) max).getBytes()) : max), hasNullValue, timeZone); } @@ -372,8 +374,8 @@ private static Domain getDomain( Object min = minimums.get(i); Object max = maximums.get(i); - long minValue = min instanceof Binary ? getShortDecimalValue(((Binary) min).getBytes()) : asLong(min); - long maxValue = max instanceof Binary ? getShortDecimalValue(((Binary) max).getBytes()) : asLong(max); + long minValue = min instanceof Slice ? getShortDecimalValue(((Slice) min).getBytes()) : asLong(min); + long maxValue = max instanceof Slice ? getShortDecimalValue(((Slice) max).getBytes()) : asLong(max); if (isStatisticsOverflow(type, minValue, maxValue)) { return Domain.create(ValueSet.all(type), hasNullValue); @@ -384,8 +386,8 @@ private static Domain getDomain( } else { for (int i = 0; i < minimums.size(); i++) { - Int128 min = Int128.fromBigEndian(((Binary) minimums.get(i)).getBytes()); - Int128 max = Int128.fromBigEndian(((Binary) maximums.get(i)).getBytes()); + Int128 min = Int128.fromBigEndian(((Slice) minimums.get(i)).getBytes()); + Int128 max = Int128.fromBigEndian(((Slice) maximums.get(i)).getBytes()); rangesBuilder.addRangeInclusive(min, max); } @@ -427,8 +429,8 @@ private static Domain getDomain( if (type instanceof VarcharType) { SortedRangeSet.Builder rangesBuilder = SortedRangeSet.builder(type, minimums.size()); for (int i = 0; i < minimums.size(); i++) { - Slice min = Slices.wrappedHeapBuffer(((Binary) minimums.get(i)).toByteBuffer()); - Slice max = Slices.wrappedHeapBuffer(((Binary) maximums.get(i)).toByteBuffer()); + Slice min = (Slice) minimums.get(i); + Slice max = (Slice) maximums.get(i); rangesBuilder.addRangeInclusive(min, max); } return Domain.create(rangesBuilder.build(), hasNullValue); @@ -446,11 +448,11 @@ private static Domain getDomain( // PARQUET-1065 deprecated them. The result is that any writer that produced stats was producing unusable incorrect values, except // the special case where min == max and an incorrect ordering would not be material to the result. PARQUET-1026 made binary stats // available and valid in that special case - if (!(min instanceof Binary) || !(max instanceof Binary) || !min.equals(max)) { + if (!(min instanceof Slice) || !(max instanceof Slice) || !min.equals(max)) { return Domain.create(ValueSet.all(type), hasNullValue); } - rangesBuilder.addValue(timestampEncoder.getTimestamp(decodeInt96Timestamp((Binary) min))); + rangesBuilder.addValue(timestampEncoder.getTimestamp(decodeInt96Timestamp(Binary.fromConstantByteArray(((Slice) min).getBytes())))); } return Domain.create(rangesBuilder.build(), hasNullValue); } @@ -732,11 +734,13 @@ public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics statis return false; } + T min = statistic.getMin(); + T max = statistic.getMax(); Domain domain = getDomain( columnDescriptor, columnDomain.getType(), - ImmutableList.of(statistic.getMin()), - ImmutableList.of(statistic.getMax()), + ImmutableList.of(min instanceof Binary ? Slices.wrappedBuffer(((Binary) min).getBytes()) : min), + ImmutableList.of(max instanceof Binary ? Slices.wrappedBuffer(((Binary) max).getBytes()) : max), true, timeZone); return !columnDomain.overlaps(domain); @@ -774,7 +778,7 @@ private Function getConverter(PrimitiveType primitiveType) case BINARY: case INT96: default: - return buffer -> Binary.fromReusedByteBuffer(buffer); + return Slices::wrappedHeapBuffer; } } } @@ -796,7 +800,7 @@ private Function getConverter(PrimitiveType primitiveType) case INT64 -> (i) -> dictionary.decodeToLong(i); case FLOAT -> (i) -> dictionary.decodeToFloat(i); case DOUBLE -> (i) -> dictionary.decodeToDouble(i); - case FIXED_LEN_BYTE_ARRAY, BINARY, INT96 -> (i) -> dictionary.decodeToBinary(i); + case FIXED_LEN_BYTE_ARRAY, BINARY, INT96 -> (i) -> dictionary.decodeToSlice(i); }; } } From 692af351ac5d9b1c781c0c1967c38a49512ea464 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Wed, 23 Aug 2023 09:59:17 +0530 Subject: [PATCH 2/3] Move DictionaryReader to parquet tests --- .../io/trino/parquet/ParquetEncoding.java | 35 ------------------- .../decoders/AbstractValueDecodersTest.java | 2 +- .../reader/decoders}/DictionaryReader.java | 3 +- 3 files changed, 3 insertions(+), 37 deletions(-) rename lib/trino-parquet/src/{main/java/io/trino/parquet/dictionary => test/java/io/trino/parquet/reader/decoders}/DictionaryReader.java (96%) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetEncoding.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetEncoding.java index f73524f7c09f..b547628e41e0 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetEncoding.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetEncoding.java @@ -15,7 +15,6 @@ import io.trino.parquet.dictionary.BinaryDictionary; import io.trino.parquet.dictionary.Dictionary; -import io.trino.parquet.dictionary.DictionaryReader; import io.trino.parquet.dictionary.DoubleDictionary; import io.trino.parquet.dictionary.FloatDictionary; import io.trino.parquet.dictionary.IntegerDictionary; @@ -106,24 +105,12 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu }, PLAIN_DICTIONARY { - @Override - public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) - { - return RLE_DICTIONARY.getDictionaryBasedValuesReader(descriptor, valuesType, dictionary); - } - @Override public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException { return PLAIN.initDictionary(descriptor, dictionaryPage); } - - @Override - public boolean usesDictionary() - { - return true; - } }, DELTA_BINARY_PACKED { @@ -156,24 +143,12 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu }, RLE_DICTIONARY { - @Override - public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) - { - return new DictionaryReader(dictionary); - } - @Override public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException { return PLAIN.initDictionary(descriptor, dictionaryPage); } - - @Override - public boolean usesDictionary() - { - return true; - } }; static final int INT96_TYPE_LENGTH = 12; @@ -192,11 +167,6 @@ static int getMaxLevel(ColumnDescriptor descriptor, ValuesType valuesType) }; } - public boolean usesDictionary() - { - return false; - } - public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException { @@ -207,9 +177,4 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu { throw new UnsupportedOperationException("Error decoding values in encoding: " + this.name()); } - - public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) - { - throw new UnsupportedOperationException(" Dictionary encoding is not supported for: " + name()); - } } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java index 8c37cefc09f5..0581a5273bfe 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java @@ -241,7 +241,7 @@ static Object[][] testArgs( static ValuesReader getApacheParquetReader(ParquetEncoding encoding, PrimitiveField field, Optional dictionary) { if (encoding == RLE_DICTIONARY || encoding == PLAIN_DICTIONARY) { - return encoding.getDictionaryBasedValuesReader(field.getDescriptor(), VALUES, dictionary.orElseThrow()); + return new DictionaryReader(dictionary.orElseThrow()); } checkArgument(dictionary.isEmpty(), "dictionary should be empty"); return encoding.getValuesReader(field.getDescriptor(), VALUES); diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/DictionaryReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/DictionaryReader.java similarity index 96% rename from lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/DictionaryReader.java rename to lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/DictionaryReader.java index f670de476f54..e78176c62749 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/DictionaryReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/DictionaryReader.java @@ -11,8 +11,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.parquet.dictionary; +package io.trino.parquet.reader.decoders; +import io.trino.parquet.dictionary.Dictionary; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; From 484abf14cbf8a890246b6fdd24af6c831b277541 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Wed, 23 Aug 2023 10:22:11 +0530 Subject: [PATCH 3/3] Use enhanced switch case in TupleDomainParquetPredicate --- .../TupleDomainParquetPredicate.java | 25 ++++++------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java index cdd279ae1d16..c69d0506a02c 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/TupleDomainParquetPredicate.java @@ -763,23 +763,14 @@ private ColumnIndexValueConverter() private Function getConverter(PrimitiveType primitiveType) { - switch (primitiveType.getPrimitiveTypeName()) { - case BOOLEAN: - return buffer -> buffer.get(0) != 0; - case INT32: - return buffer -> buffer.order(LITTLE_ENDIAN).getInt(0); - case INT64: - return buffer -> buffer.order(LITTLE_ENDIAN).getLong(0); - case FLOAT: - return buffer -> buffer.order(LITTLE_ENDIAN).getFloat(0); - case DOUBLE: - return buffer -> buffer.order(LITTLE_ENDIAN).getDouble(0); - case FIXED_LEN_BYTE_ARRAY: - case BINARY: - case INT96: - default: - return Slices::wrappedHeapBuffer; - } + return switch (primitiveType.getPrimitiveTypeName()) { + case BOOLEAN -> buffer -> buffer.get(0) != 0; + case INT32 -> buffer -> buffer.order(LITTLE_ENDIAN).getInt(0); + case INT64 -> buffer -> buffer.order(LITTLE_ENDIAN).getLong(0); + case FLOAT -> buffer -> buffer.order(LITTLE_ENDIAN).getFloat(0); + case DOUBLE -> buffer -> buffer.order(LITTLE_ENDIAN).getDouble(0); + case FIXED_LEN_BYTE_ARRAY, BINARY, INT96 -> Slices::wrappedHeapBuffer; + }; } }