diff --git a/presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonSessionProperties.java b/presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonSessionProperties.java index 792d043d12013..b07965622c03f 100644 --- a/presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonSessionProperties.java +++ b/presto-hive-common/src/main/java/com/facebook/presto/hive/HiveCommonSessionProperties.java @@ -41,6 +41,8 @@ public class HiveCommonSessionProperties { @VisibleForTesting public static final String RANGE_FILTERS_ON_SUBSCRIPTS_ENABLED = "range_filters_on_subscripts_enabled"; + @VisibleForTesting + public static final String PARQUET_BATCH_READ_OPTIMIZATION_ENABLED = "parquet_batch_read_optimization_enabled"; private static final String NODE_SELECTION_STRATEGY = "node_selection_strategy"; private static final String ORC_BLOOM_FILTERS_ENABLED = "orc_bloom_filters_enabled"; @@ -55,7 +57,6 @@ public class HiveCommonSessionProperties private static final String ORC_STREAM_BUFFER_SIZE = "orc_stream_buffer_size"; private static final String ORC_TINY_STRIPE_THRESHOLD = "orc_tiny_stripe_threshold"; private static final String ORC_ZSTD_JNI_DECOMPRESSION_ENABLED = "orc_zstd_jni_decompression_enabled"; - private static final String PARQUET_BATCH_READ_OPTIMIZATION_ENABLED = "parquet_batch_read_optimization_enabled"; private static final String PARQUET_BATCH_READER_VERIFICATION_ENABLED = "parquet_batch_reader_verification_enabled"; private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; private static final String PARQUET_USE_COLUMN_NAMES = "parquet_use_column_names"; diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 026c4bab55f7c..498e28aa15d59 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -103,6 +103,7 @@ import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED; +import static com.facebook.presto.hive.HiveCommonSessionProperties.PARQUET_BATCH_READ_OPTIMIZATION_ENABLED; import static com.facebook.presto.iceberg.FileContent.EQUALITY_DELETES; import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES; import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; @@ -1628,6 +1629,83 @@ public void testMetadataVersionsMaintainingProperties() } } + @DataProvider(name = "decimalVectorReader") + public Object[] decimalVectorReader() + { + return new Object[] {true, false}; + } + + private Session decimalVectorReaderEnabledSession(boolean decimalVectorReaderEnabled) + { + return Session.builder(getQueryRunner().getDefaultSession()) + .setCatalogSessionProperty(ICEBERG_CATALOG, PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, String.valueOf(decimalVectorReaderEnabled)) + .build(); + } + + @Test(dataProvider = "decimalVectorReader") + public void testDecimal(boolean decimalVectorReaderEnabled) + { + String tableName = "test_decimal_vector_reader"; + try { + // Create a table with decimal column + assertUpdate("CREATE TABLE " + tableName + " (short_decimal_column_int32 decimal(5,2), short_decimal_column_int64 decimal(16, 4), long_decimal_column decimal(19, 5))"); + + String values = " VALUES (cast(-1.00 as decimal(5,2)), null, cast(9999999999.123 as decimal(19, 5)))," + + "(cast(1.00 as decimal(5,2)), cast(121321 as decimal(16, 4)), null)," + + "(cast(-1.00 as decimal(5,2)), cast(-1215789.45 as decimal(16, 4)), cast(1234584.21 as decimal(19, 5)))," + + "(cast(1.00 as decimal(5,2)), cast(-67867878.12 as decimal(16, 4)), cast(-9999999999.123 as decimal(19, 5)))"; + + // Insert data to table + assertUpdate("INSERT INTO " + tableName + values, 4); + + Session session = decimalVectorReaderEnabledSession(decimalVectorReaderEnabled); + assertQuery(session, "SELECT * FROM " + tableName, values); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testAllIcebergType() + { + String tmpTableName = "test_vector_reader_all_type"; + try { + assertUpdate(format("" + + "CREATE TABLE %s ( " + + " c_boolean BOOLEAN, " + + " c_int INT," + + " c_bigint BIGINT, " + + " c_double DOUBLE, " + + " c_real REAL, " + + " c_date DATE, " + + " c_timestamp TIMESTAMP, " + + " c_varchar VARCHAR, " + + " c_varbinary VARBINARY, " + + " c_array ARRAY(BIGINT), " + + " c_map MAP(VARCHAR, INT), " + + " c_row ROW(a INT, b VARCHAR) " + + ") WITH (format = 'PARQUET')", tmpTableName)); + + assertUpdate(format("" + + "INSERT INTO %s " + + "SELECT c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_array, c_map, c_row " + + "FROM ( " + + " VALUES " + + " (null, null, null, null, null, null, null, null, null, null, null, null), " + + " (true, INT '1245', BIGINT '1', DOUBLE '2.2', REAL '-24.124', DATE '2024-07-29', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), to_ieee754_64(1), sequence(0, 10), MAP(ARRAY['aaa', 'bbbb'], ARRAY[1, 2]), CAST(ROW(1, 'AAA') AS ROW(a INT, b VARCHAR)))," + + " (false, INT '-1245', BIGINT '-1', DOUBLE '2.3', REAL '243215.435', DATE '2024-07-29', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), to_ieee754_64(4), sequence(30, 35), MAP(ARRAY['ccc', 'bbbb'], ARRAY[-1, -2]), CAST(ROW(-1, 'AAA') AS ROW(a INT, b VARCHAR))) " + + ") AS x (c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_array, c_map, c_row)", tmpTableName), 3); + + Session decimalVectorReaderEnabled = decimalVectorReaderEnabledSession(true); + Session decimalVectorReaderDisable = decimalVectorReaderEnabledSession(false); + assertQueryWithSameQueryRunner(decimalVectorReaderEnabled, "SELECT * FROM " + tmpTableName, decimalVectorReaderDisable); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tmpTableName); + } + } + private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List expectedFileContent) { // check delete file list diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java index d806c1befe682..5d839e6be3b36 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java @@ -43,6 +43,7 @@ import com.facebook.presto.parquet.batchreader.decoders.rle.BinaryRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.BooleanRLEValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.Int32RLEDictionaryValuesDecoder; +import com.facebook.presto.parquet.batchreader.decoders.rle.Int32ShortDecimalRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.Int64RLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.Int64TimestampMicrosRLEDictionaryValuesDecoder; import com.facebook.presto.parquet.batchreader.decoders.rle.LongDecimalRLEDictionaryValuesDecoder; @@ -168,6 +169,9 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript switch (type) { case INT32: case FLOAT: { + if (isDecimalType(columnDescriptor) && isShortDecimalType(columnDescriptor)) { + return new Int32ShortDecimalRLEDictionaryValuesDecoder(bitWidth, inputStream, (IntegerDictionary) dictionary); + } return new Int32RLEDictionaryValuesDecoder(bitWidth, inputStream, (IntegerDictionary) dictionary); } case INT64: { diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int32ShortDecimalRLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int32ShortDecimalRLEDictionaryValuesDecoder.java new file mode 100644 index 0000000000000..43d7445d5fbee --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int32ShortDecimalRLEDictionaryValuesDecoder.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.parquet.batchreader.decoders.rle; + +import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder; +import com.facebook.presto.parquet.dictionary.IntegerDictionary; + +import java.io.IOException; +import java.io.InputStream; + +public class Int32ShortDecimalRLEDictionaryValuesDecoder + extends Int32RLEDictionaryValuesDecoder + implements ShortDecimalValuesDecoder +{ + public Int32ShortDecimalRLEDictionaryValuesDecoder(int bitWidth, InputStream in, IntegerDictionary dictionary) + { + super(bitWidth, in, dictionary); + } + + @Override + public void readNext(long[] values, int offset, int length) throws IOException + { + int[] tempValues = new int[length]; + super.readNext(tempValues, 0, length); + for (int i = 0; i < length; i++) { + values[offset + i] = tempValues[i]; + } + } +}