From 38550ac3f0f25a44f1332d3c12eb236130c92a13 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Wed, 1 Feb 2023 15:38:34 +0530 Subject: [PATCH 1/3] Optimize RLE decoder for BOOLEAN parquet type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Benchmark (encoding) Mode Cnt Before After Units BenchmarkBooleanColumnReader.read RLE thrpt 20 245.353 ± 6.927 899.504 ± 7.844 ops/s --- .../RleBitPackingHybridBooleanDecoder.java | 54 +++++++++++ .../reader/decoders/ValueDecoders.java | 6 +- .../reader/BenchmarkBooleanColumnReader.java | 91 +++++++++++++++++++ .../reader/TestColumnReaderBenchmark.java | 13 +++ .../decoders/AbstractValueDecodersTest.java | 8 ++ .../decoders/TestBooleanValueDecoders.java | 3 +- 6 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/RleBitPackingHybridBooleanDecoder.java create mode 100644 lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkBooleanColumnReader.java diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/RleBitPackingHybridBooleanDecoder.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/RleBitPackingHybridBooleanDecoder.java new file mode 100644 index 000000000000..d75638b96b9b --- /dev/null +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/RleBitPackingHybridBooleanDecoder.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader.decoders; + +import io.trino.parquet.reader.SimpleSliceInputStream; +import io.trino.parquet.reader.flat.NullsDecoder; + +/** + * Decoder for RLE encoded values of BOOLEAN primitive type + * + * Run Length Encoding / Bit-Packing Hybrid (RLE) + * + */ +public final class RleBitPackingHybridBooleanDecoder + implements ValueDecoder +{ + private NullsDecoder decoder; + + @Override + public void init(SimpleSliceInputStream input) + { + // First int is size in bytes which is not needed here + input.skip(Integer.BYTES); + this.decoder = new NullsDecoder(input.asSlice()); + } + + @Override + public void read(byte[] values, int offset, int length) + { + boolean[] buffer = new boolean[length]; + decoder.readNext(buffer, 0, length); + for (int i = 0; i < length; i++) { + // NullsDecoder returns false for 1 (non-null) and true for 0 (null) + values[offset + i] = buffer[i] ? (byte) 0 : (byte) 1; + } + } + + @Override + public void skip(int n) + { + decoder.skip(n); + } +} diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java index d2061a3cab25..e77e0bd0c80e 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java @@ -165,7 +165,11 @@ public static ValueDecoder getBooleanDecoder(ParquetEncoding encoding, P { return switch (encoding) { case PLAIN -> new BooleanPlainValueDecoder(); - case RLE, BIT_PACKED -> new BooleanApacheParquetValueDecoder(getApacheParquetReader(encoding, field)); + case RLE -> new RleBitPackingHybridBooleanDecoder(); + // BIT_PACKED is a deprecated encoding which should not be used anymore as per + // https://github.com/apache/parquet-format/blob/master/Encodings.md#bit-packed-deprecated-bit_packed--4 + // An unoptimized decoder for this encoding is provided here for compatibility with old files or non-compliant writers + case BIT_PACKED -> new BooleanApacheParquetValueDecoder(getApacheParquetReader(encoding, field)); default -> throw wrongEncoding(encoding, field); }; } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkBooleanColumnReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkBooleanColumnReader.java new file mode 100644 index 000000000000..e0f1dd7e7b6a --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkBooleanColumnReader.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import io.trino.parquet.ParquetEncoding; +import io.trino.parquet.PrimitiveField; +import io.trino.spi.type.BooleanType; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.openjdk.jmh.annotations.Param; + +import java.util.Random; + +import static io.trino.parquet.ParquetEncoding.PLAIN; +import static io.trino.parquet.ParquetEncoding.RLE; +import static java.lang.String.format; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; + +public class BenchmarkBooleanColumnReader + extends AbstractColumnReaderBenchmark +{ + private static final Random RANDOM = new Random(23423523L); + + @Param({ + "PLAIN", + "RLE", + }) + public ParquetEncoding encoding; + + @Override + protected PrimitiveField createPrimitiveField() + { + PrimitiveType parquetType = Types.optional(BOOLEAN) + .named("name"); + return new PrimitiveField( + BooleanType.BOOLEAN, + true, + new ColumnDescriptor(new String[] {"test"}, parquetType, 0, 0), + 0); + } + + @Override + protected ValuesWriter createValuesWriter(int bufferSize) + { + if (encoding == PLAIN) { + return new BooleanPlainValuesWriter(); + } + else if (encoding == RLE) { + return new RunLengthBitPackingHybridValuesWriter(1, bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + } + throw new UnsupportedOperationException(format("encoding %s is not supported", encoding)); + } + + @Override + protected void writeValue(ValuesWriter writer, boolean[] batch, int index) + { + writer.writeBoolean(batch[index]); + } + + @Override + protected boolean[] generateDataBatch(int size) + { + boolean[] batch = new boolean[size]; + for (int i = 0; i < size; i++) { + batch[i] = RANDOM.nextBoolean(); + } + return batch; + } + + public static void main(String[] args) + throws Exception + { + run(BenchmarkBooleanColumnReader.class); + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReaderBenchmark.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReaderBenchmark.java index 8fb9f6ed9bdd..31adab2c40c0 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReaderBenchmark.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReaderBenchmark.java @@ -22,9 +22,22 @@ import static io.trino.parquet.ParquetEncoding.DELTA_BINARY_PACKED; import static io.trino.parquet.ParquetEncoding.DELTA_BYTE_ARRAY; import static io.trino.parquet.ParquetEncoding.PLAIN; +import static io.trino.parquet.ParquetEncoding.RLE; public class TestColumnReaderBenchmark { + @Test + public void testBooleanColumnReaderBenchmark() + throws IOException + { + for (ParquetEncoding encoding : ImmutableList.of(PLAIN, RLE)) { + BenchmarkBooleanColumnReader benchmark = new BenchmarkBooleanColumnReader(); + benchmark.encoding = encoding; + benchmark.setup(); + benchmark.read(); + } + } + @Test public void testByteColumnReaderBenchmark() throws IOException diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java index cfe0a0e8e451..f5d17dce6ca9 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java @@ -38,6 +38,7 @@ import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter; import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; import org.apache.parquet.column.values.plain.PlainValuesWriter; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; @@ -87,6 +88,7 @@ import static org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; public abstract class AbstractValueDecodersTest @@ -358,6 +360,12 @@ private static List> generateRunnerForBatchSize(Function new BooleanPlainValuesWriter(); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestBooleanValueDecoders.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestBooleanValueDecoders.java index 35da7a49c63a..48dff9c02420 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestBooleanValueDecoders.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestBooleanValueDecoders.java @@ -22,6 +22,7 @@ import java.util.Random; import static io.trino.parquet.ParquetEncoding.PLAIN; +import static io.trino.parquet.ParquetEncoding.RLE; import static io.trino.parquet.reader.TestData.generateMixedData; import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.BooleanApacheParquetValueDecoder; import static io.trino.parquet.reader.flat.BooleanColumnAdapter.BOOLEAN_ADAPTER; @@ -41,7 +42,7 @@ protected Object[][] tests() BooleanApacheParquetValueDecoder::new, BOOLEAN_ADAPTER, (actual, expected) -> assertThat(actual).isEqualTo(expected)), - ImmutableList.of(PLAIN), + ImmutableList.of(PLAIN, RLE), BooleanInputProvider.values()); } From 9eb790855539c6ade898516ba08963d02bfcf624 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Thu, 2 Feb 2023 12:54:52 +0530 Subject: [PATCH 2/3] Optimize INT96 decoder in parquet reader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Benchmark Mode Cnt Before After Units BenchmarkInt96ColumnReader.read thrpt 20 22.513 ± 2.030 119.971 ± 2.384 ops/s --- .../trino/parquet/ParquetTimestampUtils.java | 7 +- .../decoders/ApacheParquetValueDecoders.java | 48 ------ .../reader/decoders/PlainValueDecoders.java | 37 ++++ .../reader/decoders/ValueDecoders.java | 6 +- .../reader/BenchmarkInt96ColumnReader.java | 90 ++++++++++ .../reader/TestColumnReaderBenchmark.java | 9 + .../decoders/AbstractValueDecodersTest.java | 3 +- .../decoders/TestInt96ValueDecoder.java | 160 ++++++++++++++++++ 8 files changed, 308 insertions(+), 52 deletions(-) create mode 100644 lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkInt96ColumnReader.java create mode 100644 lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestInt96ValueDecoder.java diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTimestampUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTimestampUtils.java index c9bc06f5e84f..61db1872c8ef 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTimestampUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetTimestampUtils.java @@ -57,9 +57,14 @@ public static DecodedTimestamp decodeInt96Timestamp(Binary timestampBinary) // little endian encoding - need to invert byte order long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]); - verify(timeOfDayNanos >= 0 && timeOfDayNanos < NANOSECONDS_PER_DAY, "Invalid timeOfDayNanos: %s", timeOfDayNanos); int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]); + return decodeInt96Timestamp(timeOfDayNanos, julianDay); + } + + public static DecodedTimestamp decodeInt96Timestamp(long timeOfDayNanos, int julianDay) + { + verify(timeOfDayNanos >= 0 && timeOfDayNanos < NANOSECONDS_PER_DAY, "Invalid timeOfDayNanos: %s", timeOfDayNanos); long epochSeconds = (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * SECONDS_PER_DAY + timeOfDayNanos / NANOSECONDS_PER_SECOND; return new DecodedTimestamp(epochSeconds, (int) (timeOfDayNanos % NANOSECONDS_PER_SECOND)); } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java index 5992a719fd3b..abf647b9f719 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ApacheParquetValueDecoders.java @@ -14,7 +14,6 @@ package io.trino.parquet.reader.decoders; import io.trino.parquet.reader.SimpleSliceInputStream; -import io.trino.plugin.base.type.DecodedTimestamp; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; @@ -23,8 +22,6 @@ import java.nio.ByteBuffer; import static io.trino.parquet.ParquetReaderUtils.castToByte; -import static io.trino.parquet.ParquetTimestampUtils.decodeInt96Timestamp; -import static io.trino.parquet.reader.flat.Int96ColumnAdapter.Int96Buffer; import static java.util.Objects.requireNonNull; /** @@ -73,49 +70,4 @@ public void skip(int n) delegate.skip(n); } } - - public static final class Int96ApacheParquetValueDecoder - implements ValueDecoder - { - private final ValuesReader delegate; - - public Int96ApacheParquetValueDecoder(ValuesReader delegate) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - } - - @Override - public void init(SimpleSliceInputStream input) - { - initialize(input, delegate); - } - - @Override - public void read(Int96Buffer values, int offset, int length) - { - int endOffset = offset + length; - for (int i = offset; i < endOffset; i++) { - DecodedTimestamp decodedTimestamp = decodeInt96Timestamp(delegate.readBytes()); - values.longs[i] = decodedTimestamp.epochSeconds(); - values.ints[i] = decodedTimestamp.nanosOfSecond(); - } - } - - @Override - public void skip(int n) - { - delegate.skip(n); - } - } - - private static void initialize(SimpleSliceInputStream input, ValuesReader reader) - { - byte[] buffer = input.readBytes(); - try { - reader.initFromPage(0, ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, 0, buffer.length))); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/PlainValueDecoders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/PlainValueDecoders.java index f91034028e43..a5b9f5901149 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/PlainValueDecoders.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/PlainValueDecoders.java @@ -16,16 +16,21 @@ import io.airlift.slice.Slices; import io.trino.parquet.reader.SimpleSliceInputStream; import io.trino.parquet.reader.flat.BitPackingUtils; +import io.trino.plugin.base.type.DecodedTimestamp; import io.trino.spi.type.Decimals; import io.trino.spi.type.Int128; import org.apache.parquet.column.ColumnDescriptor; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.SIZE_OF_INT; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; import static io.trino.parquet.ParquetReaderUtils.toByteExact; import static io.trino.parquet.ParquetReaderUtils.toShortExact; +import static io.trino.parquet.ParquetTimestampUtils.decodeInt96Timestamp; import static io.trino.parquet.ParquetTypeUtils.checkBytesFitInShortDecimal; import static io.trino.parquet.ParquetTypeUtils.getShortDecimalValue; import static io.trino.parquet.reader.flat.BitPackingUtils.unpack; +import static io.trino.parquet.reader.flat.Int96ColumnAdapter.Int96Buffer; import static java.lang.Math.min; import static java.util.Objects.requireNonNull; import static org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; @@ -334,4 +339,36 @@ public void skip(int n) input.skip(n * UUID_SIZE); } } + + public static final class Int96PlainValueDecoder + implements ValueDecoder + { + private static final int LENGTH = SIZE_OF_LONG + SIZE_OF_INT; + + private SimpleSliceInputStream input; + + @Override + public void init(SimpleSliceInputStream input) + { + this.input = requireNonNull(input, "input is null"); + } + + @Override + public void read(Int96Buffer values, int offset, int length) + { + input.ensureBytesAvailable(length * LENGTH); + for (int i = offset; i < offset + length; i++) { + DecodedTimestamp timestamp = decodeInt96Timestamp(input.readLongUnsafe(), input.readIntUnsafe()); + + values.longs[i] = timestamp.epochSeconds(); + values.ints[i] = timestamp.nanosOfSecond(); + } + } + + @Override + public void skip(int n) + { + input.skip(n * LENGTH); + } + } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java index e77e0bd0c80e..c48639ffe956 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/decoders/ValueDecoders.java @@ -30,7 +30,6 @@ import static io.trino.parquet.ParquetEncoding.PLAIN; import static io.trino.parquet.ValuesType.VALUES; import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.BooleanApacheParquetValueDecoder; -import static io.trino.parquet.reader.decoders.ApacheParquetValueDecoders.Int96ApacheParquetValueDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedByteDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedIntDecoder; import static io.trino.parquet.reader.decoders.DeltaBinaryPackedDecoders.DeltaBinaryPackedLongDecoder; @@ -45,6 +44,7 @@ import static io.trino.parquet.reader.decoders.PlainByteArrayDecoders.BoundedVarcharPlainValueDecoder; import static io.trino.parquet.reader.decoders.PlainByteArrayDecoders.CharPlainValueDecoder; import static io.trino.parquet.reader.decoders.PlainValueDecoders.BooleanPlainValueDecoder; +import static io.trino.parquet.reader.decoders.PlainValueDecoders.Int96PlainValueDecoder; import static io.trino.parquet.reader.decoders.PlainValueDecoders.IntPlainValueDecoder; import static io.trino.parquet.reader.decoders.PlainValueDecoders.IntToBytePlainValueDecoder; import static io.trino.parquet.reader.decoders.PlainValueDecoders.IntToShortPlainValueDecoder; @@ -177,7 +177,9 @@ public static ValueDecoder getBooleanDecoder(ParquetEncoding encoding, P public static ValueDecoder getInt96Decoder(ParquetEncoding encoding, PrimitiveField field) { if (PLAIN.equals(encoding)) { - return new Int96ApacheParquetValueDecoder(getApacheParquetReader(encoding, field)); + // INT96 type has been deprecated as per https://github.com/apache/parquet-format/blob/master/Encodings.md#plain-plain--0 + // However, this encoding is still commonly encountered in parquet files. + return new Int96PlainValueDecoder(); } throw wrongEncoding(encoding, field); } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkInt96ColumnReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkInt96ColumnReader.java new file mode 100644 index 000000000000..33cf0def2798 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/BenchmarkInt96ColumnReader.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader; + +import io.trino.parquet.PrimitiveField; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; + +import java.time.LocalDateTime; +import java.time.Year; +import java.util.Random; + +import static io.airlift.slice.SizeOf.SIZE_OF_INT; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static io.trino.parquet.reader.TestingColumnReader.encodeInt96Timestamp; +import static io.trino.parquet.reader.flat.Int96ColumnAdapter.Int96Buffer; +import static io.trino.spi.type.TimestampType.TIMESTAMP_NANOS; +import static java.time.ZoneOffset.UTC; +import static java.time.temporal.ChronoField.NANO_OF_SECOND; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; + +public class BenchmarkInt96ColumnReader + extends AbstractColumnReaderBenchmark +{ + private static final int LENGTH = SIZE_OF_LONG + SIZE_OF_INT; + + private final Random random = new Random(56246); + + @Override + protected PrimitiveField createPrimitiveField() + { + PrimitiveType parquetType = Types.optional(INT96).named("name"); + return new PrimitiveField( + TIMESTAMP_NANOS, + true, + new ColumnDescriptor(new String[] {"test"}, parquetType, 0, 0), + 0); + } + + @Override + protected ValuesWriter createValuesWriter(int bufferSize) + { + return new FixedLenByteArrayPlainValuesWriter(LENGTH, bufferSize, bufferSize, HeapByteBufferAllocator.getInstance()); + } + + @Override + protected void writeValue(ValuesWriter writer, Int96Buffer batch, int index) + { + writer.writeBytes(encodeInt96Timestamp(batch.longs[index], batch.ints[index])); + } + + @Override + protected Int96Buffer generateDataBatch(int size) + { + Int96Buffer batch = new Int96Buffer(size); + for (int i = 0; i < size; i++) { + LocalDateTime timestamp = LocalDateTime.of( + random.nextInt(Year.MIN_VALUE, Year.MAX_VALUE + 1), + random.nextInt(1, 13), + random.nextInt(1, 29), + random.nextInt(24), + random.nextInt(60), + random.nextInt(60)); + batch.longs[i] = timestamp.toEpochSecond(UTC); + batch.ints[i] = timestamp.get(NANO_OF_SECOND); + } + return batch; + } + + public static void main(String[] args) + throws Exception + { + run(BenchmarkInt96ColumnReader.class); + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReaderBenchmark.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReaderBenchmark.java index 31adab2c40c0..5b7220e21b6a 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReaderBenchmark.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReaderBenchmark.java @@ -133,4 +133,13 @@ public void testUuidColumnReaderBenchmark() benchmark.read(); } } + + @Test + public void testInt96ColumnReaderBenchmark() + throws IOException + { + BenchmarkInt96ColumnReader benchmark = new BenchmarkInt96ColumnReader(); + benchmark.setup(); + benchmark.read(); + } } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java index f5d17dce6ca9..81f9d19ccf98 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/AbstractValueDecodersTest.java @@ -371,7 +371,7 @@ private static ValuesWriter getValuesWriter(ParquetEncoding encoding, PrimitiveT case BOOLEAN -> new BooleanPlainValuesWriter(); case FIXED_LEN_BYTE_ARRAY -> new FixedLenByteArrayPlainValuesWriter(typeLength.orElseThrow(), MAX_DATA_SIZE, MAX_DATA_SIZE, HeapByteBufferAllocator.getInstance()); case BINARY, INT32, INT64, DOUBLE, FLOAT -> new PlainValuesWriter(MAX_DATA_SIZE, MAX_DATA_SIZE, HeapByteBufferAllocator.getInstance()); - default -> throw new IllegalArgumentException("PLAIN encoding writer is not supported for type " + typeName); + case INT96 -> new FixedLenByteArrayPlainValuesWriter(12, MAX_DATA_SIZE, MAX_DATA_SIZE, HeapByteBufferAllocator.getInstance()); }; } if (encoding.equals(RLE_DICTIONARY) || encoding.equals(PLAIN_DICTIONARY)) { @@ -382,6 +382,7 @@ private static ValuesWriter getValuesWriter(ParquetEncoding encoding, PrimitiveT case INT64 -> new PlainLongDictionaryValuesWriter(MAX_VALUE, RLE, Encoding.PLAIN, HeapByteBufferAllocator.getInstance()); case FLOAT -> new PlainFloatDictionaryValuesWriter(MAX_VALUE, RLE, Encoding.PLAIN, HeapByteBufferAllocator.getInstance()); case DOUBLE -> new PlainDoubleDictionaryValuesWriter(MAX_VALUE, RLE, Encoding.PLAIN, HeapByteBufferAllocator.getInstance()); + case INT96 -> new PlainFixedLenArrayDictionaryValuesWriter(MAX_VALUE, 12, RLE, Encoding.PLAIN, HeapByteBufferAllocator.getInstance()); default -> throw new IllegalArgumentException("Dictionary encoding writer is not supported for type " + typeName); }; } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestInt96ValueDecoder.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestInt96ValueDecoder.java new file mode 100644 index 000000000000..010ecfbb7990 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/decoders/TestInt96ValueDecoder.java @@ -0,0 +1,160 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.reader.decoders; + +import com.google.common.collect.ImmutableList; +import io.trino.parquet.reader.SimpleSliceInputStream; +import io.trino.plugin.base.type.DecodedTimestamp; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.ValuesWriter; + +import java.time.LocalDateTime; +import java.time.Year; +import java.util.OptionalInt; +import java.util.Random; + +import static io.trino.parquet.ParquetEncoding.PLAIN; +import static io.trino.parquet.ParquetEncoding.RLE_DICTIONARY; +import static io.trino.parquet.ParquetTimestampUtils.decodeInt96Timestamp; +import static io.trino.parquet.reader.TestingColumnReader.encodeInt96Timestamp; +import static io.trino.parquet.reader.flat.Int96ColumnAdapter.INT96_ADAPTER; +import static io.trino.parquet.reader.flat.Int96ColumnAdapter.Int96Buffer; +import static io.trino.spi.type.TimestampType.TIMESTAMP_NANOS; +import static java.time.ZoneOffset.UTC; +import static java.time.temporal.ChronoField.NANO_OF_SECOND; +import static java.util.Objects.requireNonNull; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; +import static org.assertj.core.api.Assertions.assertThat; + +public final class TestInt96ValueDecoder + extends AbstractValueDecodersTest +{ + @Override + protected Object[][] tests() + { + return testArgs( + new TestType<>( + createField(INT96, OptionalInt.empty(), TIMESTAMP_NANOS), + ValueDecoders::getInt96Decoder, + Int96ApacheParquetValueDecoder::new, + INT96_ADAPTER, + (actual, expected) -> { + assertThat(actual.longs).isEqualTo(expected.longs); + assertThat(actual.ints).isEqualTo(expected.ints); + }), + ImmutableList.of(PLAIN, RLE_DICTIONARY), + TimestampInputProvider.values()); + } + + private enum TimestampInputProvider + implements InputDataProvider + { + INT96_RANDOM { + @Override + public DataBuffer write(ValuesWriter valuesWriter, int dataSize) + { + Random random = new Random(dataSize); + long[] epochSeconds = new long[dataSize]; + int[] nanos = new int[dataSize]; + for (int i = 0; i < dataSize; i++) { + int month = random.nextInt(1, 13); + int dateMax = 30; + if (month == 2) { + dateMax = 28; + } + else if ((month < 8 && month % 2 == 1) || (month >= 8 && month % 2 == 0)) { + dateMax = 31; + } + LocalDateTime timestamp = LocalDateTime.of( + random.nextInt(Year.MIN_VALUE, Year.MAX_VALUE + 1), + month, + random.nextInt(1, dateMax + 1), + random.nextInt(24), + random.nextInt(60), + random.nextInt(60)); + epochSeconds[i] = timestamp.toEpochSecond(UTC); + nanos[i] = timestamp.get(NANO_OF_SECOND); + } + return writeValues(valuesWriter, epochSeconds, nanos); + } + }, + INT96_REPEAT { + @Override + public DataBuffer write(ValuesWriter valuesWriter, int dataSize) + { + Random random = new Random(dataSize); + LocalDateTime[] constants = new LocalDateTime[] { + LocalDateTime.MIN, + LocalDateTime.of(1410, 7, 15, 14, 30, 12), + LocalDateTime.of(1920, 8, 15, 23, 59, 59, 10020030), + LocalDateTime.of(1969, 12, 31, 23, 59, 59, 999), + LocalDateTime.of(1970, 1, 1, 0, 0, 0, 1000000), + LocalDateTime.of(2022, 2, 3, 12, 8, 51, 1), + LocalDateTime.of(123456, 1, 2, 3, 4, 5, 678901234), + LocalDateTime.MAX}; + long[] epochSeconds = new long[dataSize]; + int[] nanos = new int[dataSize]; + for (int i = 0; i < dataSize; i++) { + LocalDateTime timestamp = constants[random.nextInt(constants.length)]; + epochSeconds[i] = timestamp.toEpochSecond(UTC); + nanos[i] = timestamp.get(NANO_OF_SECOND); + } + return writeValues(valuesWriter, epochSeconds, nanos); + } + } + } + + private static DataBuffer writeValues(ValuesWriter valuesWriter, long[] epochSeconds, int[] nanos) + { + for (int i = 0; i < epochSeconds.length; i++) { + valuesWriter.writeBytes(encodeInt96Timestamp(epochSeconds[i], nanos[i])); + } + + return getWrittenBuffer(valuesWriter); + } + + public static final class Int96ApacheParquetValueDecoder + implements ValueDecoder + { + private final ValuesReader delegate; + + public Int96ApacheParquetValueDecoder(ValuesReader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void init(SimpleSliceInputStream input) + { + initialize(input, delegate); + } + + @Override + public void read(Int96Buffer values, int offset, int length) + { + int endOffset = offset + length; + for (int i = offset; i < endOffset; i++) { + DecodedTimestamp decodedTimestamp = decodeInt96Timestamp(delegate.readBytes()); + values.longs[i] = decodedTimestamp.epochSeconds(); + values.ints[i] = decodedTimestamp.nanosOfSecond(); + } + } + + @Override + public void skip(int n) + { + delegate.skip(n); + } + } +} From 8b470282b4c1a0209ba51da50ebeeb30e7549b52 Mon Sep 17 00:00:00 2001 From: Krzysztof Skrzypczynski Date: Fri, 3 Feb 2023 00:19:42 +0530 Subject: [PATCH 3/3] Use optimized decoders to read dictionaries in parquet Co-authored-by: Raunaq Morarka --- .../parquet/dictionary/DoubleDictionary.java | 22 ++++++++++--------- .../parquet/dictionary/FloatDictionary.java | 22 ++++++++++--------- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/DoubleDictionary.java b/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/DoubleDictionary.java index 4d98a9a6b43c..3d67ba12ff17 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/DoubleDictionary.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/DoubleDictionary.java @@ -14,12 +14,11 @@ package io.trino.parquet.dictionary; import io.trino.parquet.DictionaryPage; -import org.apache.parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader; - -import java.io.IOException; +import io.trino.parquet.reader.SimpleSliceInputStream; +import io.trino.parquet.reader.decoders.ValueDecoder; import static com.google.common.base.MoreObjects.toStringHelper; -import static io.trino.parquet.ParquetReaderUtils.toInputStream; +import static io.trino.parquet.reader.decoders.PlainValueDecoders.LongPlainValueDecoder; public class DoubleDictionary implements Dictionary @@ -27,13 +26,16 @@ public class DoubleDictionary private final double[] content; public DoubleDictionary(DictionaryPage dictionaryPage) - throws IOException { - content = new double[dictionaryPage.getDictionarySize()]; - DoublePlainValuesReader doubleReader = new DoublePlainValuesReader(); - doubleReader.initFromPage(dictionaryPage.getDictionarySize(), toInputStream(dictionaryPage)); - for (int i = 0; i < content.length; i++) { - content[i] = doubleReader.readDouble(); + int length = dictionaryPage.getDictionarySize(); + long[] buffer = new long[length]; + ValueDecoder doubleReader = new LongPlainValueDecoder(); + doubleReader.init(new SimpleSliceInputStream(dictionaryPage.getSlice())); + doubleReader.read(buffer, 0, length); + + content = new double[length]; + for (int i = 0; i < length; i++) { + content[i] = Double.longBitsToDouble(buffer[i]); } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/FloatDictionary.java b/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/FloatDictionary.java index 9d70b9aec798..d113f67b3cfa 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/FloatDictionary.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/dictionary/FloatDictionary.java @@ -14,12 +14,11 @@ package io.trino.parquet.dictionary; import io.trino.parquet.DictionaryPage; -import org.apache.parquet.column.values.plain.PlainValuesReader.FloatPlainValuesReader; - -import java.io.IOException; +import io.trino.parquet.reader.SimpleSliceInputStream; +import io.trino.parquet.reader.decoders.ValueDecoder; import static com.google.common.base.MoreObjects.toStringHelper; -import static io.trino.parquet.ParquetReaderUtils.toInputStream; +import static io.trino.parquet.reader.decoders.PlainValueDecoders.IntPlainValueDecoder; public class FloatDictionary implements Dictionary @@ -27,13 +26,16 @@ public class FloatDictionary private final float[] content; public FloatDictionary(DictionaryPage dictionaryPage) - throws IOException { - content = new float[dictionaryPage.getDictionarySize()]; - FloatPlainValuesReader floatReader = new FloatPlainValuesReader(); - floatReader.initFromPage(dictionaryPage.getDictionarySize(), toInputStream(dictionaryPage)); - for (int i = 0; i < content.length; i++) { - content[i] = floatReader.readFloat(); + int length = dictionaryPage.getDictionarySize(); + int[] buffer = new int[length]; + ValueDecoder floatReader = new IntPlainValueDecoder(); + floatReader.init(new SimpleSliceInputStream(dictionaryPage.getSlice())); + floatReader.read(buffer, 0, length); + + content = new float[length]; + for (int i = 0; i < length; i++) { + content[i] = Float.intBitsToFloat(buffer[i]); } }