Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class HiveCommonSessionProperties
{
@VisibleForTesting
public static final String RANGE_FILTERS_ON_SUBSCRIPTS_ENABLED = "range_filters_on_subscripts_enabled";
@VisibleForTesting
public static final String PARQUET_BATCH_READ_OPTIMIZATION_ENABLED = "parquet_batch_read_optimization_enabled";

private static final String NODE_SELECTION_STRATEGY = "node_selection_strategy";
private static final String ORC_BLOOM_FILTERS_ENABLED = "orc_bloom_filters_enabled";
Expand All @@ -55,7 +57,6 @@ public class HiveCommonSessionProperties
private static final String ORC_STREAM_BUFFER_SIZE = "orc_stream_buffer_size";
private static final String ORC_TINY_STRIPE_THRESHOLD = "orc_tiny_stripe_threshold";
private static final String ORC_ZSTD_JNI_DECOMPRESSION_ENABLED = "orc_zstd_jni_decompression_enabled";
private static final String PARQUET_BATCH_READ_OPTIMIZATION_ENABLED = "parquet_batch_read_optimization_enabled";
private static final String PARQUET_BATCH_READER_VERIFICATION_ENABLED = "parquet_batch_reader_verification_enabled";
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_USE_COLUMN_NAMES = "parquet_use_column_names";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.hive.HiveCommonSessionProperties.PARQUET_BATCH_READ_OPTIMIZATION_ENABLED;
import static com.facebook.presto.iceberg.FileContent.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
Expand Down Expand Up @@ -1628,6 +1629,83 @@ public void testMetadataVersionsMaintainingProperties()
}
}

@DataProvider(name = "decimalVectorReader")
public Object[] decimalVectorReader()
{
return new Object[] {true, false};
}

private Session decimalVectorReaderEnabledSession(boolean decimalVectorReaderEnabled)
{
return Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty(ICEBERG_CATALOG, PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, String.valueOf(decimalVectorReaderEnabled))
.build();
}

@Test(dataProvider = "decimalVectorReader")
public void testDecimal(boolean decimalVectorReaderEnabled)
{
String tableName = "test_decimal_vector_reader";
try {
// Create a table with decimal column
assertUpdate("CREATE TABLE " + tableName + " (short_decimal_column_int32 decimal(5,2), short_decimal_column_int64 decimal(16, 4), long_decimal_column decimal(19, 5))");

String values = " VALUES (cast(-1.00 as decimal(5,2)), null, cast(9999999999.123 as decimal(19, 5)))," +
"(cast(1.00 as decimal(5,2)), cast(121321 as decimal(16, 4)), null)," +
"(cast(-1.00 as decimal(5,2)), cast(-1215789.45 as decimal(16, 4)), cast(1234584.21 as decimal(19, 5)))," +
"(cast(1.00 as decimal(5,2)), cast(-67867878.12 as decimal(16, 4)), cast(-9999999999.123 as decimal(19, 5)))";

// Insert data to table
assertUpdate("INSERT INTO " + tableName + values, 4);

Session session = decimalVectorReaderEnabledSession(decimalVectorReaderEnabled);
assertQuery(session, "SELECT * FROM " + tableName, values);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
}

@Test
public void testAllIcebergType()
{
String tmpTableName = "test_vector_reader_all_type";
try {
assertUpdate(format("" +
"CREATE TABLE %s ( " +
" c_boolean BOOLEAN, " +
" c_int INT," +
" c_bigint BIGINT, " +
" c_double DOUBLE, " +
" c_real REAL, " +
" c_date DATE, " +
" c_timestamp TIMESTAMP, " +
" c_varchar VARCHAR, " +
" c_varbinary VARBINARY, " +
" c_array ARRAY(BIGINT), " +
" c_map MAP(VARCHAR, INT), " +
" c_row ROW(a INT, b VARCHAR) " +
") WITH (format = 'PARQUET')", tmpTableName));

assertUpdate(format("" +
"INSERT INTO %s " +
"SELECT c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_array, c_map, c_row " +
"FROM ( " +
" VALUES " +
" (null, null, null, null, null, null, null, null, null, null, null, null), " +
" (true, INT '1245', BIGINT '1', DOUBLE '2.2', REAL '-24.124', DATE '2024-07-29', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), to_ieee754_64(1), sequence(0, 10), MAP(ARRAY['aaa', 'bbbb'], ARRAY[1, 2]), CAST(ROW(1, 'AAA') AS ROW(a INT, b VARCHAR)))," +
" (false, INT '-1245', BIGINT '-1', DOUBLE '2.3', REAL '243215.435', DATE '2024-07-29', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), to_ieee754_64(4), sequence(30, 35), MAP(ARRAY['ccc', 'bbbb'], ARRAY[-1, -2]), CAST(ROW(-1, 'AAA') AS ROW(a INT, b VARCHAR))) " +
") AS x (c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_array, c_map, c_row)", tmpTableName), 3);

Session decimalVectorReaderEnabled = decimalVectorReaderEnabledSession(true);
Session decimalVectorReaderDisable = decimalVectorReaderEnabledSession(false);
assertQueryWithSameQueryRunner(decimalVectorReaderEnabled, "SELECT * FROM " + tmpTableName, decimalVectorReaderDisable);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tmpTableName);
}
}

private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List<FileContent> expectedFileContent)
{
// check delete file list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.facebook.presto.parquet.batchreader.decoders.rle.BinaryRLEDictionaryValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.BooleanRLEValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.Int32RLEDictionaryValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.Int32ShortDecimalRLEDictionaryValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.Int64RLEDictionaryValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.Int64TimestampMicrosRLEDictionaryValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.rle.LongDecimalRLEDictionaryValuesDecoder;
Expand Down Expand Up @@ -168,6 +169,9 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript
switch (type) {
case INT32:
case FLOAT: {
if (isDecimalType(columnDescriptor) && isShortDecimalType(columnDescriptor)) {
return new Int32ShortDecimalRLEDictionaryValuesDecoder(bitWidth, inputStream, (IntegerDictionary) dictionary);
}
return new Int32RLEDictionaryValuesDecoder(bitWidth, inputStream, (IntegerDictionary) dictionary);
}
case INT64: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.parquet.batchreader.decoders.rle;

import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.ShortDecimalValuesDecoder;
import com.facebook.presto.parquet.dictionary.IntegerDictionary;

import java.io.IOException;
import java.io.InputStream;

public class Int32ShortDecimalRLEDictionaryValuesDecoder
extends Int32RLEDictionaryValuesDecoder
implements ShortDecimalValuesDecoder
{
public Int32ShortDecimalRLEDictionaryValuesDecoder(int bitWidth, InputStream in, IntegerDictionary dictionary)
{
super(bitWidth, in, dictionary);
}

@Override
public void readNext(long[] values, int offset, int length) throws IOException
{
int[] tempValues = new int[length];
super.readNext(tempValues, 0, length);
for (int i = 0; i < length; i++) {
values[offset + i] = tempValues[i];
}
}
}