Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class HiveClientConfig

private boolean useParquetColumnNames;
private boolean failOnCorruptedParquetStatistics = true;
private DataSize parquetMaxReadBlockSize = new DataSize(16, MEGABYTE);

private boolean assumeCanonicalPartitionKeys;

Expand Down Expand Up @@ -946,6 +947,19 @@ public HiveClientConfig setFailOnCorruptedParquetStatistics(boolean failOnCorrup
return this;
}

@NotNull
public DataSize getParquetMaxReadBlockSize()
{
return parquetMaxReadBlockSize;
}

@Config("hive.parquet.max-read-block-size")
public HiveClientConfig setParquetMaxReadBlockSize(DataSize parquetMaxReadBlockSize)
{
this.parquetMaxReadBlockSize = parquetMaxReadBlockSize;
return this;
}

@Deprecated
public boolean isOptimizeMismatchedBucketCount()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public final class HiveSessionProperties
private static final String RESPECT_TABLE_FORMAT = "respect_table_format";
private static final String PARQUET_USE_COLUMN_NAME = "parquet_use_column_names";
private static final String PARQUET_FAIL_WITH_CORRUPTED_STATISTICS = "parquet_fail_with_corrupted_statistics";
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String MAX_SPLIT_SIZE = "max_split_size";
Expand Down Expand Up @@ -257,6 +258,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Parquet: Fail when scanning Parquet files with corrupted statistics",
hiveClientConfig.isFailOnCorruptedParquetStatistics(),
false),
dataSizeSessionProperty(
PARQUET_MAX_READ_BLOCK_SIZE,
"Parquet: Maximum size of a block to read",
hiveClientConfig.getParquetMaxReadBlockSize(),
false),
dataSizeSessionProperty(
PARQUET_WRITER_BLOCK_SIZE,
"Parquet: Writer block size",
Expand Down Expand Up @@ -519,6 +525,11 @@ public static boolean isFailOnCorruptedParquetStatistics(ConnectorSession sessio
return session.getProperty(PARQUET_FAIL_WITH_CORRUPTED_STATISTICS, Boolean.class);
}

public static DataSize getParquetMaxReadBlockSize(ConnectorSession session)
{
return session.getProperty(PARQUET_MAX_READ_BLOCK_SIZE, DataSize.class);
}

public static DataSize getParquetWriterBlockSize(ConnectorSession session)
{
return session.getProperty(PARQUET_WRITER_BLOCK_SIZE, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -66,6 +67,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static com.facebook.presto.hive.HiveSessionProperties.getParquetMaxReadBlockSize;
import static com.facebook.presto.hive.HiveSessionProperties.isFailOnCorruptedParquetStatistics;
import static com.facebook.presto.hive.HiveSessionProperties.isUseParquetColumnNames;
import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName;
Expand Down Expand Up @@ -96,14 +98,6 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;

public class ParquetPageSourceFactory
implements HiveBatchPageSourceFactory
Expand Down Expand Up @@ -154,6 +148,7 @@ public Optional<? extends ConnectorPageSource> createPageSource(
columns,
isUseParquetColumnNames(session),
isFailOnCorruptedParquetStatistics(session),
getParquetMaxReadBlockSize(session),
typeManager,
effectivePredicate,
stats));
Expand All @@ -171,6 +166,7 @@ public static ParquetPageSource createParquetPageSource(
List<HiveColumnHandle> columns,
boolean useParquetColumnNames,
boolean failOnCorruptedParquetStatistics,
DataSize maxReadBlockSize,
TypeManager typeManager,
TupleDomain<HiveColumnHandle> effectivePredicate,
FileFormatDataSourceStats stats)
Expand Down Expand Up @@ -219,7 +215,8 @@ public static ParquetPageSource createParquetPageSource(
messageColumnIO,
blocks.build(),
dataSource,
systemMemoryContext);
systemMemoryContext,
maxReadBlockSize);

return new ParquetPageSource(
parquetReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public void testDefaults()
.setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE))
.setUseParquetColumnNames(false)
.setFailOnCorruptedParquetStatistics(true)
.setParquetMaxReadBlockSize(new DataSize(16, Unit.MEGABYTE))
.setUseOrcColumnNames(false)
.setAssumeCanonicalPartitionKeys(false)
.setOrcBloomFiltersEnabled(false)
Expand Down Expand Up @@ -180,6 +181,7 @@ public void testExplicitPropertyMappings()
.put("hive.text.max-line-length", "13MB")
.put("hive.parquet.use-column-names", "true")
.put("hive.parquet.fail-on-corrupted-statistics", "false")
.put("hive.parquet.max-read-block-size", "66kB")
.put("hive.orc.use-column-names", "true")
.put("hive.orc.bloom-filters.enabled", "true")
.put("hive.orc.default-bloom-filter-fpp", "0.96")
Expand Down Expand Up @@ -269,6 +271,7 @@ public void testExplicitPropertyMappings()
.setTextMaxLineLength(new DataSize(13, Unit.MEGABYTE))
.setUseParquetColumnNames(true)
.setFailOnCorruptedParquetStatistics(false)
.setParquetMaxReadBlockSize(new DataSize(66, Unit.KILOBYTE))
.setUseOrcColumnNames(true)
.setAssumeCanonicalPartitionKeys(true)
.setOrcBloomFiltersEnabled(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
import com.google.common.primitives.Shorts;
import io.airlift.units.DataSize;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector;
Expand Down Expand Up @@ -1492,6 +1493,43 @@ private void setParquetLogging()
Logger.getLogger("parquet.hadoop.ColumnChunkPageWriteStore").setLevel(Level.WARNING);
}

@Test
public void testStructMaxReadBytes()
throws Exception
{
DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE);
List<List> structValues = createTestStructs(
Collections.nCopies(500, String.join("", Collections.nCopies(33, "test"))),
Collections.nCopies(500, String.join("", Collections.nCopies(1, "test"))));
List<String> structFieldNames = asList("a", "b");
Type structType = RowType.from(asList(field("a", VARCHAR), field("b", VARCHAR)));

tester.testMaxReadBytes(
getStandardStructObjectInspector(structFieldNames, asList(javaStringObjectInspector, javaStringObjectInspector)),
structValues,
structValues,
structType,
maxReadBlockSize);
}

@Test
public void testArrayMaxReadBytes()
throws Exception
{
DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE);
Iterable<List<Integer>> values = createFixedTestArrays(limit(cycle(asList(1, null, 3, 5, null, null, null, 7, 11, null, 13, 17)), 30_000));
tester.testMaxReadBytes(getStandardListObjectInspector(javaIntObjectInspector), values, values, new ArrayType(INTEGER), maxReadBlockSize);
}

@Test
public void testMapMaxReadBytes()
throws Exception
{
DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE);
Iterable<Map<String, Long>> values = createFixedTestMaps(Collections.nCopies(5_000, String.join("", Collections.nCopies(33, "test"))), longsBetween(0, 5_000));
tester.testMaxReadBytes(getStandardMapObjectInspector(javaStringObjectInspector, javaLongObjectInspector), values, values, mapType(VARCHAR, BIGINT), maxReadBlockSize);
}

private static <T> Iterable<T> repeatEach(int n, Iterable<T> iterable)
{
return () -> new AbstractIterator<T>()
Expand Down Expand Up @@ -1611,6 +1649,47 @@ private <T> Iterable<List<T>> createNullableTestArrays(Iterable<T> values)
return insertNullEvery(ThreadLocalRandom.current().nextInt(2, 5), createTestArrays(values));
}

private <T> List<List<T>> createFixedTestArrays(Iterable<T> values)
{
List<List<T>> arrays = new ArrayList<>();
Iterator<T> valuesIter = values.iterator();
List<T> array = new ArrayList<>();
int count = 1;
while (valuesIter.hasNext()) {
if (count % 10 == 0) {
arrays.add(array);
array = new ArrayList<>();
}
if (count % 20 == 0) {
arrays.add(Collections.emptyList());
}
array.add(valuesIter.next());
++count;
}
return arrays;
}

private <K, V> Iterable<Map<K, V>> createFixedTestMaps(Iterable<K> keys, Iterable<V> values)
{
List<Map<K, V>> maps = new ArrayList<>();
Iterator<K> keysIterator = keys.iterator();
Iterator<V> valuesIterator = values.iterator();
Map<K, V> map = new HashMap<>();
int count = 1;
while (keysIterator.hasNext() && valuesIterator.hasNext()) {
if (count % 5 == 0) {
maps.add(map);
map = new HashMap<>();
}
if (count % 10 == 0) {
maps.add(Collections.emptyMap());
}
map.put(keysIterator.next(), valuesIterator.next());
++count;
}
return maps;
}

private <K, V> Iterable<Map<K, V>> createTestMaps(Iterable<K> keys, Iterable<V> values)
{
List<Map<K, V>> maps = new ArrayList<>();
Expand Down
Loading