diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java index 3b6d78d1a144..5992a719fd3b 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java @@ -13,38 +13,19 @@ */ package io.trino.parquet.reader.decoders; -import io.airlift.slice.Slice; -import io.airlift.slice.Slices; import io.trino.parquet.reader.SimpleSliceInputStream; -import io.trino.parquet.reader.flat.BinaryBuffer; import io.trino.plugin.base.type.DecodedTimestamp; -import io.trino.spi.type.CharType; -import io.trino.spi.type.Chars; -import io.trino.spi.type.Decimals; -import io.trino.spi.type.Int128; -import io.trino.spi.type.VarcharType; -import io.trino.spi.type.Varchars; import org.apache.parquet.bytes.ByteBufferInputStream; -import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.schema.LogicalTypeAnnotation; import java.io.IOException; import java.io.UncheckedIOException; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import static com.google.common.base.Preconditions.checkArgument; import static io.trino.parquet.ParquetReaderUtils.castToByte; import static io.trino.parquet.ParquetTimestampUtils.decodeInt96Timestamp; -import static io.trino.parquet.ParquetTypeUtils.checkBytesFitInShortDecimal; -import static io.trino.parquet.ParquetTypeUtils.getShortDecimalValue; import static io.trino.parquet.reader.flat.Int96ColumnAdapter.Int96Buffer; -import static io.trino.spi.type.Varchars.truncateToLength; import static java.util.Objects.requireNonNull; -import static org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; /** * This is a set of proxy value decoders that use a delegated value reader from apache lib. @@ -93,232 +74,6 @@ public void skip(int n) } } - public static final class ShortDecimalApacheParquetValueDecoder - implements ValueDecoder - { - private final ValuesReader delegate; - private final ColumnDescriptor descriptor; - private final int typeLength; - - public ShortDecimalApacheParquetValueDecoder(ValuesReader delegate, ColumnDescriptor descriptor) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); - checkArgument( - logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation decimalAnnotation - && decimalAnnotation.getPrecision() <= Decimals.MAX_SHORT_PRECISION, - "Column %s is not a short decimal", - descriptor); - this.typeLength = descriptor.getPrimitiveType().getTypeLength(); - checkArgument(typeLength > 0 && typeLength <= 16, "Expected column %s to have type length in range (1-16)", descriptor); - this.descriptor = descriptor; - } - - @Override - public void init(SimpleSliceInputStream input) - { - initialize(input, delegate); - } - - @Override - public void read(long[] values, int offset, int length) - { - int bytesOffset = 0; - int bytesLength = typeLength; - if (typeLength > Long.BYTES) { - bytesOffset = typeLength - Long.BYTES; - bytesLength = Long.BYTES; - } - for (int i = offset; i < offset + length; i++) { - byte[] bytes = delegate.readBytes().getBytes(); - checkBytesFitInShortDecimal(bytes, 0, bytesOffset, descriptor); - values[i] = getShortDecimalValue(bytes, bytesOffset, bytesLength); - } - } - - @Override - public void skip(int n) - { - delegate.skip(n); - } - } - - public static final class LongDecimalApacheParquetValueDecoder - implements ValueDecoder - { - private final ValuesReader delegate; - - public LongDecimalApacheParquetValueDecoder(ValuesReader delegate) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - } - - @Override - public void init(SimpleSliceInputStream input) - { - initialize(input, delegate); - } - - @Override - public void read(long[] values, int offset, int length) - { - int endOffset = (offset + length) * 2; - for (int currentOutputOffset = offset * 2; currentOutputOffset < endOffset; currentOutputOffset += 2) { - Int128 value = Int128.fromBigEndian(delegate.readBytes().getBytes()); - values[currentOutputOffset] = value.getHigh(); - values[currentOutputOffset + 1] = value.getLow(); - } - } - - @Override - public void skip(int n) - { - delegate.skip(n); - } - } - - public static final class BoundedVarcharApacheParquetValueDecoder - implements ValueDecoder - { - private final ValuesReader delegate; - private final int boundedLength; - - public BoundedVarcharApacheParquetValueDecoder(ValuesReader delegate, VarcharType varcharType) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - checkArgument( - !varcharType.isUnbounded(), - "Trino type %s is not a bounded varchar", - varcharType); - this.boundedLength = varcharType.getBoundedLength(); - } - - @Override - public void init(SimpleSliceInputStream input) - { - initialize(input, delegate); - } - - @Override - public void read(BinaryBuffer values, int offsetsIndex, int length) - { - for (int i = 0; i < length; i++) { - byte[] value = delegate.readBytes().getBytes(); - Slice slice = Varchars.truncateToLength(Slices.wrappedBuffer(value), boundedLength); - values.add(slice, i + offsetsIndex); - } - } - - @Override - public void skip(int n) - { - delegate.skip(n); - } - } - - public static final class CharApacheParquetValueDecoder - implements ValueDecoder - { - private final ValuesReader delegate; - private final int maxLength; - - public CharApacheParquetValueDecoder(ValuesReader delegate, CharType charType) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - this.maxLength = charType.getLength(); - } - - @Override - public void init(SimpleSliceInputStream input) - { - initialize(input, delegate); - } - - @Override - public void read(BinaryBuffer values, int offsetsIndex, int length) - { - for (int i = 0; i < length; i++) { - byte[] value = delegate.readBytes().getBytes(); - Slice slice = Chars.trimTrailingSpaces(truncateToLength(Slices.wrappedBuffer(value), maxLength)); - values.add(slice, i + offsetsIndex); - } - } - - @Override - public void skip(int n) - { - delegate.skip(n); - } - } - - public static final class BinaryApacheParquetValueDecoder - implements ValueDecoder - { - private final ValuesReader delegate; - - public BinaryApacheParquetValueDecoder(ValuesReader delegate) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - } - - @Override - public void init(SimpleSliceInputStream input) - { - initialize(input, delegate); - } - - @Override - public void read(BinaryBuffer values, int offsetsIndex, int length) - { - for (int i = 0; i < length; i++) { - byte[] value = delegate.readBytes().getBytes(); - values.add(value, i + offsetsIndex); - } - } - - @Override - public void skip(int n) - { - delegate.skip(n); - } - } - - public static final class UuidApacheParquetValueDecoder - implements ValueDecoder - { - private static final VarHandle LONG_ARRAY_HANDLE = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.LITTLE_ENDIAN); - - private final ValuesReader delegate; - - public UuidApacheParquetValueDecoder(ValuesReader delegate) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - } - - @Override - public void init(SimpleSliceInputStream input) - { - initialize(input, delegate); - } - - @Override - public void read(long[] values, int offset, int length) - { - int endOffset = (offset + length) * 2; - for (int currentOutputOffset = offset * 2; currentOutputOffset < endOffset; currentOutputOffset += 2) { - byte[] data = delegate.readBytes().getBytes(); - values[currentOutputOffset] = (long) LONG_ARRAY_HANDLE.get(data, 0); - values[currentOutputOffset + 1] = (long) LONG_ARRAY_HANDLE.get(data, Long.BYTES); - } - } - - @Override - public void skip(int n) - { - delegate.skip(n); - } - } - public static final class Int96ApacheParquetValueDecoder implements ValueDecoder { diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/DeltaByteArrayDecoders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/DeltaByteArrayDecoders.java new file mode 100644 index 000000000000..2d7f96c37160 --- /dev/null +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/DeltaByteArrayDecoders.java @@ -0,0 +1,337 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader.decoders; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.parquet.reader.SimpleSliceInputStream; +import io.trino.parquet.reader.flat.BinaryBuffer; +import io.trino.spi.type.CharType; +import io.trino.spi.type.VarcharType; + +import java.util.Arrays; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkPositionIndexes; +import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedIntDecoder; +import static io.trino.spi.type.Chars.byteCountWithoutTrailingSpace; +import static io.trino.spi.type.Varchars.byteCount; +import static java.lang.Math.max; +import static java.util.Objects.requireNonNull; + +/** + * Implementation of decoder for the encoding described at + * delta_byte_array + */ +public class DeltaByteArrayDecoders +{ + private DeltaByteArrayDecoders() {} + + public static final class BoundedVarcharDeltaByteArrayDecoder + extends AbstractDeltaByteArrayDecoder + { + private final int boundedLength; + + public BoundedVarcharDeltaByteArrayDecoder(VarcharType varcharType) + { + checkArgument( + !varcharType.isUnbounded(), + "Trino type %s is not a bounded varchar", + varcharType); + this.boundedLength = varcharType.getBoundedLength(); + } + + @Override + public void read(BinaryBuffer values, int offset, int length) + { + InputLengths lengths = getInputAndMaxLength(length); + int maxLength = lengths.maxInputLength(); + int totalInputLength = lengths.totalInputLength(); + boolean truncate = maxLength > boundedLength; + if (truncate) { + readBounded(values, offset, length, totalInputLength); + } + else { + readUnbounded(values, offset, length, totalInputLength); + } + } + + @Override + protected int truncatedLength(Slice slice, int offset, int length) + { + return byteCount(slice, offset, length, boundedLength); + } + } + + public static final class CharDeltaByteArrayDecoder + extends AbstractDeltaByteArrayDecoder + { + private final int maxLength; + + public CharDeltaByteArrayDecoder(CharType charType) + { + this.maxLength = charType.getLength(); + } + + @Override + public void read(BinaryBuffer values, int offset, int length) + { + int totalInputLength = getInputLength(length); + readBounded(values, offset, length, totalInputLength); + } + + @Override + protected int truncatedLength(Slice slice, int offset, int length) + { + return byteCountWithoutTrailingSpace(slice, offset, length, maxLength); + } + } + + public static final class BinaryDeltaByteArrayDecoder + extends AbstractDeltaByteArrayDecoder + { + @Override + protected int truncatedLength(Slice slice, int offset, int length) + { + throw new UnsupportedOperationException(); + } + + @Override + public void read(BinaryBuffer values, int offset, int length) + { + int totalInputLength = getInputLength(length); + readUnbounded(values, offset, length, totalInputLength); + } + } + + private abstract static class AbstractDeltaByteArrayDecoder + implements ValueDecoder + { + private int[] prefixLengths; + private int[] suffixLengths; + private int inputLengthsOffset; + // At the end of skip/read for each batch of positions, this field is + // populated with prefixLength bytes for the first position in the next read + private byte[] firstPrefix = new byte[0]; + private SimpleSliceInputStream input; + + @Override + public void init(SimpleSliceInputStream input) + { + this.input = requireNonNull(input, "input is null"); + this.prefixLengths = readDeltaEncodedLengths(input); + this.suffixLengths = readDeltaEncodedLengths(input); + } + + @Override + public void skip(int n) + { + checkPositionIndexes(inputLengthsOffset, inputLengthsOffset + n, prefixLengths.length); + if (n == 0) { + return; + } + + // If we've skipped to the end, there's no need to process anything + if (inputLengthsOffset + n == prefixLengths.length) { + inputLengthsOffset += n; + return; + } + + int totalSuffixesLength = getSuffixesLength(n); + Slice inputSlice = input.asSlice(); + // Start from the suffix and go back position by position to fill the prefix for next read + int bytesLeft = prefixLengths[inputLengthsOffset + n]; + byte[] newPrefix = new byte[bytesLeft]; + + int current = n - 1; + int inputOffset = totalSuffixesLength - suffixLengths[inputLengthsOffset + n - 1]; + while (bytesLeft > 0 && inputOffset >= 0) { + int currentPrefixLength = prefixLengths[inputLengthsOffset + current]; + if (currentPrefixLength < bytesLeft) { + int toCopy = bytesLeft - currentPrefixLength; + inputSlice.getBytes(inputOffset, newPrefix, currentPrefixLength, toCopy); + bytesLeft -= toCopy; + if (bytesLeft == 0) { + break; + } + } + inputOffset -= suffixLengths[inputLengthsOffset + current - 1]; + current--; + } + System.arraycopy(firstPrefix, 0, newPrefix, 0, bytesLeft); + firstPrefix = newPrefix; + + input.skip(totalSuffixesLength); + inputLengthsOffset += n; + } + + protected abstract int truncatedLength(Slice slice, int offset, int length); + + protected void readBounded(BinaryBuffer values, int offset, int length, int totalInputLength) + { + checkPositionIndexes(inputLengthsOffset, inputLengthsOffset + length, prefixLengths.length); + int[] outputOffsets = values.getOffsets(); + byte[] dataBuffer = readUnbounded(outputOffsets, offset, length, totalInputLength); + Slice inputSlice = Slices.wrappedBuffer(dataBuffer); + inputLengthsOffset += length; + + // Try to find the first truncated position + int i = 0; + int inputOffset = 0; + for (; i < length; i++) { + int inputLength = outputOffsets[offset + i + 1] - outputOffsets[offset + i]; + int outputLength = truncatedLength(inputSlice, inputOffset, inputLength); + if (inputLength != outputLength) { + break; + } + inputOffset += inputLength; + } + + if (i == length) { + // No trimming or truncating took place + values.addChunk(inputSlice); + return; + } + + // Resume the iteration, this time shifting positions left according to trimming/truncation + int outputOffset = inputOffset; + int nextOffset = outputOffsets[offset + i]; + for (; i < length; i++) { + int currentOffset = nextOffset; + nextOffset = outputOffsets[offset + i + 1]; + int inputLength = nextOffset - currentOffset; + int outputLength = truncatedLength(inputSlice, inputOffset, inputLength); + System.arraycopy(dataBuffer, inputOffset, dataBuffer, outputOffset, outputLength); + outputOffsets[offset + i + 1] = outputOffsets[offset + i] + outputLength; + inputOffset += inputLength; + outputOffset += outputLength; + } + + values.addChunk(inputSlice.slice(0, outputOffset)); + } + + protected void readUnbounded(BinaryBuffer values, int offset, int length, int totalInputLength) + { + checkPositionIndexes(inputLengthsOffset, inputLengthsOffset + length, prefixLengths.length); + int[] outputOffsets = values.getOffsets(); + Slice outputBuffer = Slices.wrappedBuffer(readUnbounded(outputOffsets, offset, length, totalInputLength)); + values.addChunk(outputBuffer); + inputLengthsOffset += length; + } + + protected int getSuffixesLength(int length) + { + int totalSuffixesLength = 0; + for (int i = 0; i < length; i++) { + totalSuffixesLength += suffixLengths[inputLengthsOffset + i]; + } + return totalSuffixesLength; + } + + protected int getInputLength(int length) + { + int totalInputLength = 0; + for (int i = 0; i < length; i++) { + totalInputLength += prefixLengths[inputLengthsOffset + i] + suffixLengths[inputLengthsOffset + i]; + } + return totalInputLength; + } + + protected InputLengths getInputAndMaxLength(int length) + { + int totalInputLength = 0; + int maxLength = 0; + for (int i = 0; i < length; i++) { + int inputLength = prefixLengths[inputLengthsOffset + i] + suffixLengths[inputLengthsOffset + i]; + totalInputLength += inputLength; + maxLength = max(maxLength, inputLength); + } + return new InputLengths(totalInputLength, maxLength); + } + + protected record InputLengths(int totalInputLength, int maxInputLength) {} + + private byte[] readUnbounded(int[] outputOffsets, int offset, int length, int totalInputLength) + { + byte[] output = new byte[totalInputLength]; + Slice inputSlice = input.asSlice(); + // System#arraycopy performs better than Slice#getBytes, therefore we + // process the input as a byte array rather than through SimpleSliceInputStream#readBytes + byte[] inputBytes; + int inputOffsetStart; + if (inputSlice.length() != 0) { + inputBytes = inputSlice.byteArray(); + inputOffsetStart = inputSlice.byteArrayOffset(); + } + else { + inputBytes = new byte[0]; + inputOffsetStart = 0; + } + int inputOffset = inputOffsetStart; + + // Read first position by copying prefix from previous read + outputOffsets[offset + 1] = outputOffsets[offset] + prefixLengths[inputLengthsOffset] + suffixLengths[inputLengthsOffset]; + System.arraycopy(firstPrefix, 0, output, 0, prefixLengths[inputLengthsOffset]); + int outputOffset = prefixLengths[inputLengthsOffset]; + int outputLength = suffixLengths[inputLengthsOffset]; + + // Read remaining length - 1 positions + for (int i = 1; i < length; i++) { + int prefixLength = prefixLengths[inputLengthsOffset + i]; + int suffixLength = suffixLengths[inputLengthsOffset + i]; + outputOffsets[offset + i + 1] = outputOffsets[offset + i] + prefixLength + suffixLength; + + // prefixLength of 0 is a common case, batching arraycopy calls for continuous runs of 0s + // performs better than copying position by position + if (prefixLength > 0) { + // Copy all previous continuous suffixes + System.arraycopy(inputBytes, inputOffset, output, outputOffset, outputLength); + inputOffset += outputLength; + outputOffset += outputLength; + outputLength = 0; + + // Copy the current prefix + int previousPositionLength = prefixLengths[inputLengthsOffset + i - 1] + suffixLengths[inputLengthsOffset + i - 1]; + int previousOutputStart = outputOffset - previousPositionLength; + System.arraycopy(output, previousOutputStart, output, outputOffset, prefixLength); + outputOffset += prefixLength; + } + outputLength += suffixLength; + } + // Copy any remaining suffixes + System.arraycopy(inputBytes, inputOffset, output, outputOffset, outputLength); + inputOffset += outputLength; + outputOffset += outputLength; + input.skip(inputOffset - inputOffsetStart); + + if (inputLengthsOffset + length < prefixLengths.length) { + // Prepare prefix for next read if end of input has not been reached + int previousPositionLength = prefixLengths[inputLengthsOffset + length - 1] + suffixLengths[inputLengthsOffset + length - 1]; + int previousOutputStart = outputOffset - previousPositionLength; + firstPrefix = Arrays.copyOfRange(output, previousOutputStart, previousOutputStart + prefixLengths[inputLengthsOffset + length]); + } + return output; + } + } + + private static int[] readDeltaEncodedLengths(SimpleSliceInputStream input) + { + DeltaBinaryPackedIntDecoder decoder = new DeltaBinaryPackedIntDecoder(); + decoder.init(input); + int valueCount = decoder.getValueCount(); + int[] lengths = new int[valueCount]; + decoder.read(lengths, 0, valueCount); + return lengths; + } +} diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/TransformingValueDecoders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/TransformingValueDecoders.java index ce091fcb8057..018911c4e4e2 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/TransformingValueDecoders.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/TransformingValueDecoders.java @@ -24,13 +24,18 @@ import io.trino.spi.type.Int128; import io.trino.spi.type.TimestampType; import io.trino.spi.type.TimestampWithTimeZoneType; +import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.joda.time.DateTimeZone; import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.parquet.ParquetEncoding.DELTA_BYTE_ARRAY; import static io.trino.parquet.ParquetReaderUtils.toByteExact; import static io.trino.parquet.ParquetReaderUtils.toShortExact; +import static io.trino.parquet.ParquetTypeUtils.checkBytesFitInShortDecimal; import static io.trino.parquet.ParquetTypeUtils.getShortDecimalValue; +import static io.trino.parquet.reader.decoders.DeltaByteArrayDecoders.BinaryDeltaByteArrayDecoder; import static io.trino.parquet.reader.decoders.ValueDecoders.getBinaryDecoder; import static io.trino.parquet.reader.decoders.ValueDecoders.getInt32Decoder; import static io.trino.parquet.reader.decoders.ValueDecoders.getInt96Decoder; @@ -439,6 +444,24 @@ public void skip(int n) } public static ValueDecoder getBinaryLongDecimalDecoder(ParquetEncoding encoding, PrimitiveField field) + { + return new BinaryToLongDecimalTransformDecoder(getBinaryDecoder(encoding, field)); + } + + public static ValueDecoder getDeltaFixedWidthLongDecimalDecoder(ParquetEncoding encoding, PrimitiveField field) + { + checkArgument(encoding.equals(DELTA_BYTE_ARRAY), "encoding %s is not DELTA_BYTE_ARRAY", encoding); + ColumnDescriptor descriptor = field.getDescriptor(); + LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); + checkArgument( + logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation decimalAnnotation + && decimalAnnotation.getPrecision() > Decimals.MAX_SHORT_PRECISION, + "Column %s is not a long decimal", + descriptor); + return new BinaryToLongDecimalTransformDecoder(new BinaryDeltaByteArrayDecoder()); + } + + public static ValueDecoder getBinaryShortDecimalDecoder(ParquetEncoding encoding, PrimitiveField field) { ValueDecoder delegate = getBinaryDecoder(encoding, field); return new ValueDecoder<>() @@ -455,14 +478,17 @@ public void read(long[] values, int offset, int length) BinaryBuffer buffer = new BinaryBuffer(length); delegate.read(buffer, 0, length); int[] offsets = buffer.getOffsets(); - Slice binaryInput = buffer.asSlice(); + byte[] inputBytes = buffer.asSlice().byteArray(); for (int i = 0; i < length; i++) { int positionOffset = offsets[i]; int positionLength = offsets[i + 1] - positionOffset; - Int128 value = Int128.fromBigEndian(binaryInput.getBytes(positionOffset, positionLength)); - values[2 * (offset + i)] = value.getHigh(); - values[2 * (offset + i) + 1] = value.getLow(); + if (positionLength > 8) { + throw new ParquetDecodingException("Unable to read BINARY type decimal of size " + positionLength + " as a short decimal"); + } + // No need for checkBytesFitInShortDecimal as the standard requires variable binary decimals + // to be stored in minimum possible number of bytes + values[offset + i] = getShortDecimalValue(inputBytes, positionOffset, positionLength); } } @@ -474,11 +500,22 @@ public void skip(int n) }; } - public static ValueDecoder getBinaryShortDecimalDecoder(ParquetEncoding encoding, PrimitiveField field) + public static ValueDecoder getDeltaFixedWidthShortDecimalDecoder(ParquetEncoding encoding, PrimitiveField field) { - ValueDecoder delegate = getBinaryDecoder(encoding, field); + checkArgument(encoding.equals(DELTA_BYTE_ARRAY), "encoding %s is not DELTA_BYTE_ARRAY", encoding); + ColumnDescriptor descriptor = field.getDescriptor(); + LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); + checkArgument( + logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation decimalAnnotation + && decimalAnnotation.getPrecision() <= Decimals.MAX_SHORT_PRECISION, + "Column %s is not a short decimal", + descriptor); + int typeLength = descriptor.getPrimitiveType().getTypeLength(); + checkArgument(typeLength > 0 && typeLength <= 16, "Expected column %s to have type length in range (1-16)", descriptor); return new ValueDecoder<>() { + private final ValueDecoder delegate = new BinaryDeltaByteArrayDecoder(); + @Override public void init(SimpleSliceInputStream input) { @@ -490,18 +527,21 @@ public void read(long[] values, int offset, int length) { BinaryBuffer buffer = new BinaryBuffer(length); delegate.read(buffer, 0, length); - int[] offsets = buffer.getOffsets(); - byte[] inputBytes = buffer.asSlice().byteArray(); + // Each position in FIXED_LEN_BYTE_ARRAY has fixed length + int bytesOffset = 0; + int bytesLength = typeLength; + if (typeLength > Long.BYTES) { + bytesOffset = typeLength - Long.BYTES; + bytesLength = Long.BYTES; + } + + byte[] inputBytes = buffer.asSlice().byteArray(); + int[] offsets = buffer.getOffsets(); for (int i = 0; i < length; i++) { - int positionOffset = offsets[i]; - int positionLength = offsets[i + 1] - positionOffset; - if (positionLength > 8) { - throw new ParquetDecodingException("Unable to read BINARY type decimal of size " + positionLength + " as a short decimal"); - } - // No need for checkBytesFitInShortDecimal as the standard requires variable binary decimals - // to be stored in minimum possible number of bytes - values[offset + i] = getShortDecimalValue(inputBytes, positionOffset, positionLength); + int inputOffset = offsets[i]; + checkBytesFitInShortDecimal(inputBytes, inputOffset, bytesOffset, descriptor); + values[offset + i] = getShortDecimalValue(inputBytes, inputOffset + bytesOffset, bytesLength); } } @@ -682,6 +722,40 @@ public static ValueDecoder getShortDecimalToByteDecoder(ParquetEncoding return new LongToByteTransformDecoder(getShortDecimalDecoder(encoding, field)); } + public static ValueDecoder getDeltaUuidDecoder(ParquetEncoding encoding) + { + checkArgument(encoding.equals(DELTA_BYTE_ARRAY), "encoding %s is not DELTA_BYTE_ARRAY", encoding); + ValueDecoder delegate = new BinaryDeltaByteArrayDecoder(); + return new ValueDecoder<>() + { + @Override + public void init(SimpleSliceInputStream input) + { + delegate.init(input); + } + + @Override + public void read(long[] values, int offset, int length) + { + BinaryBuffer buffer = new BinaryBuffer(length); + delegate.read(buffer, 0, length); + SimpleSliceInputStream binaryInput = new SimpleSliceInputStream(buffer.asSlice()); + + int endOffset = (offset + length) * 2; + for (int outputOffset = offset * 2; outputOffset < endOffset; outputOffset += 2) { + values[outputOffset] = binaryInput.readLong(); + values[outputOffset + 1] = binaryInput.readLong(); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + }; + } + private static class LongToIntTransformDecoder implements ValueDecoder { @@ -781,6 +855,46 @@ public void skip(int n) } } + private static class BinaryToLongDecimalTransformDecoder + implements ValueDecoder + { + private final ValueDecoder delegate; + + private BinaryToLongDecimalTransformDecoder(ValueDecoder delegate) + { + this.delegate = delegate; + } + + @Override + public void init(SimpleSliceInputStream input) + { + delegate.init(input); + } + + @Override + public void read(long[] values, int offset, int length) + { + BinaryBuffer buffer = new BinaryBuffer(length); + delegate.read(buffer, 0, length); + int[] offsets = buffer.getOffsets(); + Slice binaryInput = buffer.asSlice(); + + for (int i = 0; i < length; i++) { + int positionOffset = offsets[i]; + int positionLength = offsets[i + 1] - positionOffset; + Int128 value = Int128.fromBigEndian(binaryInput.getBytes(positionOffset, positionLength)); + values[2 * (offset + i)] = value.getHigh(); + values[2 * (offset + i) + 1] = value.getLow(); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + } + private static class InlineTransformDecoder implements ValueDecoder { diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java index 3b4278df57ab..d2061a3cab25 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java @@ -29,18 +29,15 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.trino.parquet.ParquetEncoding.PLAIN; import static io.trino.parquet.ValuesType.VALUES; -import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.BinaryApacheParquetValueDecoder; import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.BooleanApacheParquetValueDecoder; -import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.BoundedVarcharApacheParquetValueDecoder; -import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.CharApacheParquetValueDecoder; import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.Int96ApacheParquetValueDecoder; -import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.LongDecimalApacheParquetValueDecoder; -import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.ShortDecimalApacheParquetValueDecoder; -import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.UuidApacheParquetValueDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedByteDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedIntDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedLongDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedShortDecoder; +import static io.trino.parquet.reader.decoders.DeltaByteArrayDecoders.BinaryDeltaByteArrayDecoder; +import static io.trino.parquet.reader.decoders.DeltaByteArrayDecoders.BoundedVarcharDeltaByteArrayDecoder; +import static io.trino.parquet.reader.decoders.DeltaByteArrayDecoders.CharDeltaByteArrayDecoder; import static io.trino.parquet.reader.decoders.DeltaLengthByteArrayDecoders.BinaryDeltaLengthDecoder; import static io.trino.parquet.reader.decoders.DeltaLengthByteArrayDecoders.BoundedVarcharDeltaLengthDecoder; import static io.trino.parquet.reader.decoders.DeltaLengthByteArrayDecoders.CharDeltaLengthDecoder; @@ -57,6 +54,9 @@ import static io.trino.parquet.reader.decoders.PlainValueDecoders.UuidPlainValueDecoder; import static io.trino.parquet.reader.decoders.TransformingValueDecoders.getBinaryLongDecimalDecoder; import static io.trino.parquet.reader.decoders.TransformingValueDecoders.getBinaryShortDecimalDecoder; +import static io.trino.parquet.reader.decoders.TransformingValueDecoders.getDeltaFixedWidthLongDecimalDecoder; +import static io.trino.parquet.reader.decoders.TransformingValueDecoders.getDeltaFixedWidthShortDecimalDecoder; +import static io.trino.parquet.reader.decoders.TransformingValueDecoders.getDeltaUuidDecoder; import static io.trino.parquet.reader.decoders.TransformingValueDecoders.getInt32ToLongDecoder; import static io.trino.parquet.reader.decoders.TransformingValueDecoders.getInt64ToByteDecoder; import static io.trino.parquet.reader.decoders.TransformingValueDecoders.getInt64ToIntDecoder; @@ -120,8 +120,7 @@ public static ValueDecoder getUuidDecoder(ParquetEncoding encoding, Prim { return switch (encoding) { case PLAIN -> new UuidPlainValueDecoder(); - case DELTA_BYTE_ARRAY -> - new UuidApacheParquetValueDecoder(getApacheParquetReader(encoding, field)); + case DELTA_BYTE_ARRAY -> getDeltaUuidDecoder(encoding); default -> throw wrongEncoding(encoding, field); }; } @@ -183,9 +182,7 @@ public static ValueDecoder getFixedWidthShortDecimalDecoder(ParquetEncod { return switch (encoding) { case PLAIN -> new ShortDecimalFixedLengthByteArrayDecoder(field.getDescriptor()); - case DELTA_BYTE_ARRAY -> new ShortDecimalApacheParquetValueDecoder( - getApacheParquetReader(encoding, field), - field.getDescriptor()); + case DELTA_BYTE_ARRAY -> getDeltaFixedWidthShortDecimalDecoder(encoding, field); default -> throw wrongEncoding(encoding, field); }; } @@ -194,8 +191,7 @@ public static ValueDecoder getFixedWidthLongDecimalDecoder(ParquetEncodi { return switch (encoding) { case PLAIN -> new LongDecimalPlainValueDecoder(field.getDescriptor().getPrimitiveType().getTypeLength()); - case DELTA_BYTE_ARRAY -> - new LongDecimalApacheParquetValueDecoder(getApacheParquetReader(encoding, field)); + case DELTA_BYTE_ARRAY -> getDeltaFixedWidthLongDecimalDecoder(encoding, field); default -> throw wrongEncoding(encoding, field); }; } @@ -210,8 +206,7 @@ public static ValueDecoder getBoundedVarcharBinaryDecoder(ParquetE return switch (encoding) { case PLAIN -> new BoundedVarcharPlainValueDecoder((VarcharType) trinoType); case DELTA_LENGTH_BYTE_ARRAY -> new BoundedVarcharDeltaLengthDecoder((VarcharType) trinoType); - case DELTA_BYTE_ARRAY -> - new BoundedVarcharApacheParquetValueDecoder(getApacheParquetReader(encoding, field), (VarcharType) trinoType); + case DELTA_BYTE_ARRAY -> new BoundedVarcharDeltaByteArrayDecoder((VarcharType) trinoType); default -> throw wrongEncoding(encoding, field); }; } @@ -226,8 +221,7 @@ public static ValueDecoder getCharBinaryDecoder(ParquetEncoding en return switch (encoding) { case PLAIN -> new CharPlainValueDecoder((CharType) trinoType); case DELTA_LENGTH_BYTE_ARRAY -> new CharDeltaLengthDecoder((CharType) trinoType); - case DELTA_BYTE_ARRAY -> - new CharApacheParquetValueDecoder(getApacheParquetReader(encoding, field), (CharType) trinoType); + case DELTA_BYTE_ARRAY -> new CharDeltaByteArrayDecoder((CharType) trinoType); default -> throw wrongEncoding(encoding, field); }; } @@ -237,8 +231,7 @@ public static ValueDecoder getBinaryDecoder(ParquetEncoding encodi return switch (encoding) { case PLAIN -> new BinaryPlainValueDecoder(); case DELTA_LENGTH_BYTE_ARRAY -> new BinaryDeltaLengthDecoder(); - case DELTA_BYTE_ARRAY -> - new BinaryApacheParquetValueDecoder(getApacheParquetReader(encoding, field)); + case DELTA_BYTE_ARRAY -> new BinaryDeltaByteArrayDecoder(); default -> throw wrongEncoding(encoding, field); }; } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkLongDecimalColumnReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkLongDecimalColumnReader.java index 806923147416..f4d2c35e07af 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkLongDecimalColumnReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkLongDecimalColumnReader.java @@ -15,21 +15,27 @@ import io.airlift.slice.Slice; import io.airlift.slice.Slices; +import io.trino.parquet.ParquetEncoding; import io.trino.parquet.PrimitiveField; import io.trino.spi.type.DecimalType; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; +import org.openjdk.jmh.annotations.Param; import java.util.Random; import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static io.trino.parquet.ParquetEncoding.DELTA_BYTE_ARRAY; +import static io.trino.parquet.ParquetEncoding.PLAIN; import static io.trino.parquet.reader.TestData.randomBigInteger; +import static java.lang.String.format; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; public class BenchmarkLongDecimalColumnReader @@ -39,6 +45,12 @@ public class BenchmarkLongDecimalColumnReader private final Random random = new Random(1); + @Param({ + "PLAIN", + "DELTA_BYTE_ARRAY", + }) + public ParquetEncoding encoding; + @Override protected PrimitiveField createPrimitiveField() { @@ -56,7 +68,13 @@ protected PrimitiveField createPrimitiveField() @Override protected ValuesWriter createValuesWriter(int bufferSize) { - return new FixedLenByteArrayPlainValuesWriter(LENGTH, bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + if (encoding.equals(PLAIN)) { + return new FixedLenByteArrayPlainValuesWriter(LENGTH, bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + } + else if (encoding.equals(DELTA_BYTE_ARRAY)) { + return new DeltaByteArrayWriter(bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + } + throw new UnsupportedOperationException(format("encoding %s is not supported", encoding)); } @Override diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkShortDecimalColumnReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkShortDecimalColumnReader.java index e5345ba8c506..40a90878f4db 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkShortDecimalColumnReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkShortDecimalColumnReader.java @@ -13,11 +13,13 @@ */ package io.trino.parquet.reader; +import io.trino.parquet.ParquetEncoding; import io.trino.parquet.PrimitiveField; import io.trino.spi.type.DecimalType; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -25,9 +27,12 @@ import org.apache.parquet.schema.Types; import org.openjdk.jmh.annotations.Param; +import static io.trino.parquet.ParquetEncoding.DELTA_BYTE_ARRAY; +import static io.trino.parquet.ParquetEncoding.PLAIN; import static io.trino.parquet.reader.TestData.longToBytes; import static io.trino.parquet.reader.TestData.maxPrecision; import static io.trino.parquet.reader.TestData.unscaledRandomShortDecimalSupplier; +import static java.lang.String.format; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; public class BenchmarkShortDecimalColumnReader @@ -38,6 +43,12 @@ public class BenchmarkShortDecimalColumnReader }) public int byteArrayLength; + @Param({ + "PLAIN", + "DELTA_BYTE_ARRAY", + }) + public ParquetEncoding encoding; + @Override protected PrimitiveField createPrimitiveField() { @@ -56,7 +67,13 @@ protected PrimitiveField createPrimitiveField() @Override protected ValuesWriter createValuesWriter(int bufferSize) { - return new FixedLenByteArrayPlainValuesWriter(byteArrayLength, bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + if (encoding.equals(PLAIN)) { + return new FixedLenByteArrayPlainValuesWriter(byteArrayLength, bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + } + else if (encoding.equals(DELTA_BYTE_ARRAY)) { + return new DeltaByteArrayWriter(bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + } + throw new UnsupportedOperationException(format("encoding %s is not supported", encoding)); } @Override diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkUuidColumnReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkUuidColumnReader.java new file mode 100644 index 000000000000..46671c1076fb --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkUuidColumnReader.java @@ -0,0 +1,101 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.parquet.ParquetEncoding; +import io.trino.parquet.PrimitiveField; +import io.trino.spi.type.UuidType; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; +import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.openjdk.jmh.annotations.Param; + +import java.util.UUID; + +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static io.trino.parquet.ParquetEncoding.DELTA_BYTE_ARRAY; +import static io.trino.parquet.ParquetEncoding.PLAIN; +import static java.lang.String.format; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; + +public class BenchmarkUuidColumnReader + extends AbstractColumnReaderBenchmark +{ + private static final int LENGTH = 2 * SIZE_OF_LONG; + + @Param({ + "PLAIN", + "DELTA_BYTE_ARRAY", + }) + public ParquetEncoding encoding; + + @Override + protected PrimitiveField createPrimitiveField() + { + PrimitiveType parquetType = Types.optional(FIXED_LEN_BYTE_ARRAY) + .length(LENGTH) + .as(LogicalTypeAnnotation.uuidType()) + .named("name"); + return new PrimitiveField( + UuidType.UUID, + true, + new ColumnDescriptor(new String[] {"test"}, parquetType, 0, 0), + 0); + } + + @Override + protected ValuesWriter createValuesWriter(int bufferSize) + { + if (encoding.equals(PLAIN)) { + return new FixedLenByteArrayPlainValuesWriter(LENGTH, bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + } + else if (encoding.equals(DELTA_BYTE_ARRAY)) { + return new DeltaByteArrayWriter(bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + } + throw new UnsupportedOperationException(format("encoding %s is not supported", encoding)); + } + + @Override + protected void writeValue(ValuesWriter writer, long[] batch, int index) + { + Slice slice = Slices.wrappedLongArray(batch, index * 2, 2); + writer.writeBytes(Binary.fromConstantByteArray(slice.getBytes())); + } + + @Override + protected long[] generateDataBatch(int size) + { + long[] batch = new long[size * 2]; + for (int i = 0; i < size; i++) { + UUID uuid = UUID.randomUUID(); + batch[i * 2] = uuid.getMostSignificantBits(); + batch[(i * 2) + 1] = uuid.getLeastSignificantBits(); + } + return batch; + } + + public static void main(String[] args) + throws Exception + { + run(BenchmarkUuidColumnReader.class); + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReaderBenchmark.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReaderBenchmark.java index cb7203476ca0..8fb9f6ed9bdd 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReaderBenchmark.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReaderBenchmark.java @@ -20,6 +20,7 @@ import java.io.IOException; import static io.trino.parquet.ParquetEncoding.DELTA_BINARY_PACKED; +import static io.trino.parquet.ParquetEncoding.DELTA_BYTE_ARRAY; import static io.trino.parquet.ParquetEncoding.PLAIN; public class TestColumnReaderBenchmark @@ -95,4 +96,28 @@ public void testLongColumnReaderBenchmark() } } } + + @Test + public void testLongDecimalColumnReaderBenchmark() + throws IOException + { + for (ParquetEncoding encoding : ImmutableList.of(PLAIN, DELTA_BYTE_ARRAY)) { + BenchmarkLongDecimalColumnReader benchmark = new BenchmarkLongDecimalColumnReader(); + benchmark.encoding = encoding; + benchmark.setup(); + benchmark.read(); + } + } + + @Test + public void testUuidColumnReaderBenchmark() + throws IOException + { + for (ParquetEncoding encoding : ImmutableList.of(PLAIN, DELTA_BYTE_ARRAY)) { + BenchmarkUuidColumnReader benchmark = new BenchmarkUuidColumnReader(); + benchmark.encoding = encoding; + benchmark.setup(); + benchmark.read(); + } + } } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestShortDecimalColumnReaderBenchmark.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestShortDecimalColumnReaderBenchmark.java index 9ab7f6c80c77..fab2dc1bf14b 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestShortDecimalColumnReaderBenchmark.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestShortDecimalColumnReaderBenchmark.java @@ -13,10 +13,15 @@ */ package io.trino.parquet.reader; +import com.google.common.collect.ImmutableList; +import io.trino.parquet.ParquetEncoding; import org.testng.annotations.Test; import java.io.IOException; +import static io.trino.parquet.ParquetEncoding.DELTA_BYTE_ARRAY; +import static io.trino.parquet.ParquetEncoding.PLAIN; + public class TestShortDecimalColumnReaderBenchmark { @Test @@ -24,10 +29,13 @@ public void testShortDecimalColumnReaderBenchmark() throws IOException { for (int typeLength = 1; typeLength <= 8; typeLength++) { - BenchmarkShortDecimalColumnReader benchmark = new BenchmarkShortDecimalColumnReader(); - benchmark.byteArrayLength = typeLength; - benchmark.setup(); - benchmark.read(); + for (ParquetEncoding encoding : ImmutableList.of(PLAIN, DELTA_BYTE_ARRAY)) { + BenchmarkShortDecimalColumnReader benchmark = new BenchmarkShortDecimalColumnReader(); + benchmark.byteArrayLength = typeLength; + benchmark.encoding = encoding; + benchmark.setup(); + benchmark.read(); + } } } } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java index e81674071ad8..cfe0a0e8e451 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java @@ -34,6 +34,7 @@ import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong; import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter; import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; import org.apache.parquet.column.values.plain.PlainValuesWriter; @@ -62,6 +63,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.parquet.ParquetEncoding.DELTA_BINARY_PACKED; +import static io.trino.parquet.ParquetEncoding.DELTA_BYTE_ARRAY; import static io.trino.parquet.ParquetEncoding.DELTA_LENGTH_BYTE_ARRAY; import static io.trino.parquet.ParquetEncoding.PLAIN; import static io.trino.parquet.ParquetEncoding.PLAIN_DICTIONARY; @@ -85,6 +87,7 @@ import static org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; public abstract class AbstractValueDecodersTest { @@ -387,6 +390,12 @@ private static ValuesWriter getValuesWriter(ParquetEncoding encoding, PrimitiveT } throw new IllegalArgumentException("Delta length byte array encoding writer is not supported for type " + typeName); } + if (encoding.equals(DELTA_BYTE_ARRAY)) { + if (typeName.equals(BINARY) || typeName.equals(FIXED_LEN_BYTE_ARRAY)) { + return new DeltaByteArrayWriter(MAX_DATA_SIZE, MAX_DATA_SIZE, HeapByteBufferAllocator.getInstance()); + } + throw new IllegalArgumentException("Delta byte array encoding writer is not supported for type " + typeName); + } throw new UnsupportedOperationException(format("Encoding %s is not supported", encoding)); } } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestByteArrayValueDecoders.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestByteArrayValueDecoders.java index 86ed7b0d96f8..0cc75c056b68 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestByteArrayValueDecoders.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestByteArrayValueDecoders.java @@ -14,10 +14,17 @@ package io.trino.parquet.reader.decoders; import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.parquet.ParquetEncoding; +import io.trino.parquet.reader.SimpleSliceInputStream; import io.trino.parquet.reader.flat.BinaryBuffer; import io.trino.spi.type.CharType; +import io.trino.spi.type.Chars; import io.trino.spi.type.VarcharType; +import io.trino.spi.type.Varchars; import io.trino.testing.DataProviders; +import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.io.api.Binary; @@ -29,26 +36,29 @@ import java.util.function.Function; import java.util.function.Supplier; +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.parquet.ParquetEncoding.DELTA_BYTE_ARRAY; import static io.trino.parquet.ParquetEncoding.DELTA_LENGTH_BYTE_ARRAY; import static io.trino.parquet.ParquetEncoding.PLAIN; import static io.trino.parquet.ParquetEncoding.RLE_DICTIONARY; import static io.trino.parquet.reader.TestData.randomAsciiData; import static io.trino.parquet.reader.TestData.randomBinaryData; -import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.BinaryApacheParquetValueDecoder; -import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.BoundedVarcharApacheParquetValueDecoder; -import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.CharApacheParquetValueDecoder; import static io.trino.parquet.reader.flat.BinaryColumnAdapter.BINARY_ADAPTER; import static io.trino.spi.type.CharType.createCharType; import static io.trino.spi.type.VarbinaryType.VARBINARY; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; import static io.trino.spi.type.VarcharType.createVarcharType; +import static io.trino.spi.type.Varchars.truncateToLength; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.assertj.core.api.Assertions.assertThat; public final class TestByteArrayValueDecoders extends AbstractValueDecodersTest { + private static final List ENCODINGS = ImmutableList.of(PLAIN, RLE_DICTIONARY, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY); + private static final BiConsumer BINARY_ASSERT = (actual, expected) -> { assertThat(actual.getOffsets()).containsExactly(expected.getOffsets()); assertThat(actual.asSlice()).isEqualTo(expected.asSlice()); @@ -65,7 +75,7 @@ protected Object[][] tests() BinaryApacheParquetValueDecoder::new, BINARY_ADAPTER, BINARY_ASSERT), - ImmutableList.of(PLAIN, RLE_DICTIONARY, DELTA_LENGTH_BYTE_ARRAY), + ENCODINGS, generateUnboundedBinaryInputs()), testArgs( new TestType<>( @@ -74,16 +84,10 @@ protected Object[][] tests() BinaryApacheParquetValueDecoder::new, BINARY_ADAPTER, BINARY_ASSERT), - ImmutableList.of(PLAIN, RLE_DICTIONARY, DELTA_LENGTH_BYTE_ARRAY), + ENCODINGS, generateUnboundedBinaryInputs()), - testArgs( - createBoundedVarcharTestType(), - ImmutableList.of(PLAIN, RLE_DICTIONARY, DELTA_LENGTH_BYTE_ARRAY), - generateBoundedVarcharInputs()), - testArgs( - createCharTestType(), - ImmutableList.of(PLAIN, RLE_DICTIONARY, DELTA_LENGTH_BYTE_ARRAY), - generateCharInputs())); + testArgs(createBoundedVarcharTestType(), ENCODINGS, generateBoundedVarcharInputs()), + testArgs(createCharTestType(), ENCODINGS, generateCharInputs())); } private static TestType createBoundedVarcharTestType() @@ -292,4 +296,110 @@ private static DataBuffer writeBytes(ValuesWriter valuesWriter, byte[][] input) return getWrittenBuffer(valuesWriter); } + + private static final class BoundedVarcharApacheParquetValueDecoder + implements ValueDecoder + { + private final ValuesReader delegate; + private final int boundedLength; + + public BoundedVarcharApacheParquetValueDecoder(ValuesReader delegate, VarcharType varcharType) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + checkArgument( + !varcharType.isUnbounded(), + "Trino type %s is not a bounded varchar", + varcharType); + this.boundedLength = varcharType.getBoundedLength(); + } + + @Override + public void init(SimpleSliceInputStream input) + { + initialize(input, delegate); + } + + @Override + public void read(BinaryBuffer values, int offsetsIndex, int length) + { + for (int i = 0; i < length; i++) { + byte[] value = delegate.readBytes().getBytes(); + Slice slice = Varchars.truncateToLength(Slices.wrappedBuffer(value), boundedLength); + values.add(slice, i + offsetsIndex); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + } + + private static final class CharApacheParquetValueDecoder + implements ValueDecoder + { + private final ValuesReader delegate; + private final int maxLength; + + public CharApacheParquetValueDecoder(ValuesReader delegate, CharType charType) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.maxLength = charType.getLength(); + } + + @Override + public void init(SimpleSliceInputStream input) + { + initialize(input, delegate); + } + + @Override + public void read(BinaryBuffer values, int offsetsIndex, int length) + { + for (int i = 0; i < length; i++) { + byte[] value = delegate.readBytes().getBytes(); + Slice slice = Chars.trimTrailingSpaces(truncateToLength(Slices.wrappedBuffer(value), maxLength)); + values.add(slice, i + offsetsIndex); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + } + + private static final class BinaryApacheParquetValueDecoder + implements ValueDecoder + { + private final ValuesReader delegate; + + public BinaryApacheParquetValueDecoder(ValuesReader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void init(SimpleSliceInputStream input) + { + initialize(input, delegate); + } + + @Override + public void read(BinaryBuffer values, int offsetsIndex, int length) + { + for (int i = 0; i < length; i++) { + byte[] value = delegate.readBytes().getBytes(); + values.add(value, i + offsetsIndex); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + } } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestFixedWidthByteArrayValueDecoders.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestFixedWidthByteArrayValueDecoders.java index 13ee45972ce4..6af0b9a1ba29 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestFixedWidthByteArrayValueDecoders.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestFixedWidthByteArrayValueDecoders.java @@ -13,30 +13,46 @@ */ package io.trino.parquet.reader.decoders; +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slices; import io.trino.parquet.ParquetEncoding; import io.trino.parquet.PrimitiveField; +import io.trino.parquet.reader.SimpleSliceInputStream; import io.trino.spi.type.DecimalType; +import io.trino.spi.type.Decimals; +import io.trino.spi.type.Int128; +import io.trino.spi.type.UuidType; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.math.BigInteger; +import java.nio.ByteOrder; import java.util.OptionalInt; import java.util.Random; +import java.util.UUID; import java.util.stream.IntStream; +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.parquet.ParquetEncoding.DELTA_BYTE_ARRAY; import static io.trino.parquet.ParquetEncoding.PLAIN; import static io.trino.parquet.ParquetEncoding.RLE_DICTIONARY; +import static io.trino.parquet.ParquetTypeUtils.checkBytesFitInShortDecimal; +import static io.trino.parquet.ParquetTypeUtils.getShortDecimalValue; import static io.trino.parquet.ParquetTypeUtils.paddingBigInteger; import static io.trino.parquet.reader.TestData.longToBytes; import static io.trino.parquet.reader.TestData.maxPrecision; import static io.trino.parquet.reader.TestData.unscaledRandomShortDecimalSupplier; -import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.LongDecimalApacheParquetValueDecoder; -import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.ShortDecimalApacheParquetValueDecoder; -import static io.trino.parquet.reader.decoders.ValueDecoders.getFixedWidthShortDecimalDecoder; import static io.trino.parquet.reader.flat.Int128ColumnAdapter.INT128_ADAPTER; import static io.trino.parquet.reader.flat.LongColumnAdapter.LONG_ADAPTER; import static io.trino.testing.DataProviders.concat; import static java.lang.Math.min; +import static java.util.Objects.requireNonNull; +import static org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; import static org.assertj.core.api.Assertions.assertThat; @@ -48,9 +64,12 @@ protected Object[][] tests() { return concat( generateShortDecimalTests(PLAIN), + generateShortDecimalTests(DELTA_BYTE_ARRAY), generateShortDecimalTests(RLE_DICTIONARY), generateLongDecimalTests(PLAIN), - generateLongDecimalTests(RLE_DICTIONARY)); + generateLongDecimalTests(DELTA_BYTE_ARRAY), + generateLongDecimalTests(RLE_DICTIONARY), + generateUuidTests()); } private static Object[][] generateShortDecimalTests(ParquetEncoding encoding) @@ -76,6 +95,16 @@ private static Object[][] generateLongDecimalTests(ParquetEncoding encoding) .toArray(Object[][]::new); } + private static Object[][] generateUuidTests() + { + return ImmutableList.of(PLAIN, DELTA_BYTE_ARRAY).stream() + .map(encoding -> new Object[] { + createUuidTestType(), + encoding, + new UuidInputProvider()}) + .toArray(Object[][]::new); + } + private static TestType createShortDecimalTestType(int typeLength, int precision) { DecimalType decimalType = DecimalType.createDecimalType(precision, 2); @@ -100,6 +129,16 @@ private static TestType createLongDecimalTestType(int typeLength) (actual, expected) -> assertThat(actual).isEqualTo(expected)); } + private static TestType createUuidTestType() + { + return new TestType<>( + createField(FIXED_LEN_BYTE_ARRAY, OptionalInt.of(16), UuidType.UUID), + ValueDecoders::getUuidDecoder, + UuidApacheParquetValueDecoder::new, + INT128_ADAPTER, + (actual, expected) -> assertThat(actual).isEqualTo(expected)); + } + private static InputDataProvider createShortDecimalInputDataProvider(int typeLength, int precision) { return new InputDataProvider() { @@ -146,6 +185,30 @@ public String toString() }; } + private static class UuidInputProvider + implements InputDataProvider + { + @Override + public DataBuffer write(ValuesWriter valuesWriter, int dataSize) + { + byte[][] bytes = new byte[dataSize][]; + for (int i = 0; i < dataSize; i++) { + UUID uuid = UUID.randomUUID(); + bytes[i] = Slices.wrappedLongArray( + uuid.getMostSignificantBits(), + uuid.getLeastSignificantBits()) + .getBytes(); + } + return writeBytes(valuesWriter, bytes); + } + + @Override + public String toString() + { + return "uuid"; + } + } + private static DataBuffer writeBytes(ValuesWriter valuesWriter, byte[][] input) { for (byte[] value : input) { @@ -154,4 +217,124 @@ private static DataBuffer writeBytes(ValuesWriter valuesWriter, byte[][] input) return getWrittenBuffer(valuesWriter); } + + private static final class ShortDecimalApacheParquetValueDecoder + implements ValueDecoder + { + private final ValuesReader delegate; + private final ColumnDescriptor descriptor; + private final int typeLength; + + private ShortDecimalApacheParquetValueDecoder(ValuesReader delegate, ColumnDescriptor descriptor) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); + checkArgument( + logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation decimalAnnotation + && decimalAnnotation.getPrecision() <= Decimals.MAX_SHORT_PRECISION, + "Column %s is not a short decimal", + descriptor); + this.typeLength = descriptor.getPrimitiveType().getTypeLength(); + checkArgument(typeLength > 0 && typeLength <= 16, "Expected column %s to have type length in range (1-16)", descriptor); + this.descriptor = descriptor; + } + + @Override + public void init(SimpleSliceInputStream input) + { + initialize(input, delegate); + } + + @Override + public void read(long[] values, int offset, int length) + { + int bytesOffset = 0; + int bytesLength = typeLength; + if (typeLength > Long.BYTES) { + bytesOffset = typeLength - Long.BYTES; + bytesLength = Long.BYTES; + } + for (int i = offset; i < offset + length; i++) { + byte[] bytes = delegate.readBytes().getBytes(); + checkBytesFitInShortDecimal(bytes, 0, bytesOffset, descriptor); + values[i] = getShortDecimalValue(bytes, bytesOffset, bytesLength); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + } + + private static final class LongDecimalApacheParquetValueDecoder + implements ValueDecoder + { + private final ValuesReader delegate; + + private LongDecimalApacheParquetValueDecoder(ValuesReader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void init(SimpleSliceInputStream input) + { + initialize(input, delegate); + } + + @Override + public void read(long[] values, int offset, int length) + { + int endOffset = (offset + length) * 2; + for (int currentOutputOffset = offset * 2; currentOutputOffset < endOffset; currentOutputOffset += 2) { + Int128 value = Int128.fromBigEndian(delegate.readBytes().getBytes()); + values[currentOutputOffset] = value.getHigh(); + values[currentOutputOffset + 1] = value.getLow(); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + } + + private static final class UuidApacheParquetValueDecoder + implements ValueDecoder + { + private static final VarHandle LONG_ARRAY_HANDLE = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.LITTLE_ENDIAN); + + private final ValuesReader delegate; + + private UuidApacheParquetValueDecoder(ValuesReader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void init(SimpleSliceInputStream input) + { + initialize(input, delegate); + } + + @Override + public void read(long[] values, int offset, int length) + { + int endOffset = (offset + length) * 2; + for (int currentOutputOffset = offset * 2; currentOutputOffset < endOffset; currentOutputOffset += 2) { + byte[] data = delegate.readBytes().getBytes(); + values[currentOutputOffset] = (long) LONG_ARRAY_HANDLE.get(data, 0); + values[currentOutputOffset + 1] = (long) LONG_ARRAY_HANDLE.get(data, Long.BYTES); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + } }