Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class HiveClientConfig

private boolean useParquetColumnNames;
private boolean failOnCorruptedParquetStatistics = true;
private DataSize parquetMaxReadBlockSize = new DataSize(16, MEGABYTE);

private boolean assumeCanonicalPartitionKeys;

Expand Down Expand Up @@ -919,6 +920,19 @@ public HiveClientConfig setFailOnCorruptedParquetStatistics(boolean failOnCorrup
return this;
}

@NotNull
public DataSize getParquetMaxReadBlockSize()
{
return parquetMaxReadBlockSize;
}

@Config("hive.parquet.max-read-block-size")
public HiveClientConfig setParquetMaxReadBlockSize(DataSize parquetMaxReadBlockSize)
{
this.parquetMaxReadBlockSize = parquetMaxReadBlockSize;
return this;
}

public boolean isOptimizeMismatchedBucketCount()
{
return optimizeMismatchedBucketCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public final class HiveSessionProperties
private static final String RESPECT_TABLE_FORMAT = "respect_table_format";
private static final String PARQUET_USE_COLUMN_NAME = "parquet_use_column_names";
private static final String PARQUET_FAIL_WITH_CORRUPTED_STATISTICS = "parquet_fail_with_corrupted_statistics";
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String MAX_SPLIT_SIZE = "max_split_size";
Expand Down Expand Up @@ -232,6 +233,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Parquet: Fail when scanning Parquet files with corrupted statistics",
hiveClientConfig.isFailOnCorruptedParquetStatistics(),
false),
dataSizeSessionProperty(
PARQUET_MAX_READ_BLOCK_SIZE,
"Parquet: Maximum size of a block to read",
hiveClientConfig.getParquetMaxReadBlockSize(),
false),
dataSizeSessionProperty(
PARQUET_WRITER_BLOCK_SIZE,
"Parquet: Writer block size",
Expand Down Expand Up @@ -426,6 +432,11 @@ public static boolean isFailOnCorruptedParquetStatistics(ConnectorSession sessio
return session.getProperty(PARQUET_FAIL_WITH_CORRUPTED_STATISTICS, Boolean.class);
}

public static DataSize getParquetMaxReadBlockSize(ConnectorSession session)
{
return session.getProperty(PARQUET_MAX_READ_BLOCK_SIZE, DataSize.class);
}

public static DataSize getParquetWriterBlockSize(ConnectorSession session)
{
return session.getProperty(PARQUET_WRITER_BLOCK_SIZE, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.prestosql.memory.context.AggregatedMemoryContext;
import io.prestosql.parquet.ParquetCorruptionException;
import io.prestosql.parquet.ParquetDataSource;
Expand Down Expand Up @@ -68,6 +69,7 @@
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_MISSING_DATA;
import static io.prestosql.plugin.hive.HiveSessionProperties.getParquetMaxReadBlockSize;
import static io.prestosql.plugin.hive.HiveSessionProperties.isFailOnCorruptedParquetStatistics;
import static io.prestosql.plugin.hive.HiveSessionProperties.isUseParquetColumnNames;
import static io.prestosql.plugin.hive.HiveUtil.getDeserializerClassName;
Expand Down Expand Up @@ -126,6 +128,7 @@ public Optional<? extends ConnectorPageSource> createPageSource(
columns,
isUseParquetColumnNames(session),
isFailOnCorruptedParquetStatistics(session),
getParquetMaxReadBlockSize(session),
typeManager,
effectivePredicate,
stats));
Expand All @@ -143,6 +146,7 @@ public static ParquetPageSource createParquetPageSource(
List<HiveColumnHandle> columns,
boolean useParquetColumnNames,
boolean failOnCorruptedParquetStatistics,
DataSize maxReadBlockSize,
TypeManager typeManager,
TupleDomain<HiveColumnHandle> effectivePredicate,
FileFormatDataSourceStats stats)
Expand Down Expand Up @@ -189,7 +193,8 @@ public static ParquetPageSource createParquetPageSource(
messageColumnIO,
blocks.build(),
dataSource,
systemMemoryContext);
systemMemoryContext,
maxReadBlockSize);

return new ParquetPageSource(
parquetReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void testDefaults()
.setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE))
.setUseParquetColumnNames(false)
.setFailOnCorruptedParquetStatistics(true)
.setParquetMaxReadBlockSize(new DataSize(16, Unit.MEGABYTE))
.setUseOrcColumnNames(false)
.setAssumeCanonicalPartitionKeys(false)
.setOrcBloomFiltersEnabled(false)
Expand Down Expand Up @@ -165,6 +166,7 @@ public void testExplicitPropertyMappings()
.put("hive.text.max-line-length", "13MB")
.put("hive.parquet.use-column-names", "true")
.put("hive.parquet.fail-on-corrupted-statistics", "false")
.put("hive.parquet.max-read-block-size", "66kB")
.put("hive.orc.use-column-names", "true")
.put("hive.orc.bloom-filters.enabled", "true")
.put("hive.orc.default-bloom-filter-fpp", "0.96")
Expand Down Expand Up @@ -244,6 +246,7 @@ public void testExplicitPropertyMappings()
.setTextMaxLineLength(new DataSize(13, Unit.MEGABYTE))
.setUseParquetColumnNames(true)
.setFailOnCorruptedParquetStatistics(false)
.setParquetMaxReadBlockSize(new DataSize(66, Unit.KILOBYTE))
.setUseOrcColumnNames(true)
.setAssumeCanonicalPartitionKeys(true)
.setOrcBloomFiltersEnabled(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
import com.google.common.primitives.Shorts;
import io.airlift.units.DataSize;
import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.RowType;
import io.prestosql.spi.type.SqlDate;
Expand Down Expand Up @@ -1485,6 +1486,43 @@ protected T computeNext()
};
}

@Test
public void testStructMaxReadBytes()
throws Exception
{
DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE);
List<List> structValues = createTestStructs(
Collections.nCopies(500, String.join("", Collections.nCopies(33, "test"))),
Collections.nCopies(500, String.join("", Collections.nCopies(1, "test"))));
List<String> structFieldNames = asList("a", "b");
Type structType = RowType.from(asList(field("a", VARCHAR), field("b", VARCHAR)));

tester.testMaxReadBytes(
getStandardStructObjectInspector(structFieldNames, asList(javaStringObjectInspector, javaStringObjectInspector)),
structValues,
structValues,
structType,
maxReadBlockSize);
}

@Test
public void testArrayMaxReadBytes()
throws Exception
{
DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE);
Iterable<List<Integer>> values = createFixedTestArrays(limit(cycle(asList(1, null, 3, 5, null, null, null, 7, 11, null, 13, 17)), 30_000));
tester.testMaxReadBytes(getStandardListObjectInspector(javaIntObjectInspector), values, values, new ArrayType(INTEGER), maxReadBlockSize);
}

@Test
public void testMapMaxReadBytes()
throws Exception
{
DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE);
Iterable<Map<String, Long>> values = createFixedTestMaps(Collections.nCopies(5_000, String.join("", Collections.nCopies(33, "test"))), longsBetween(0, 5_000));
tester.testMaxReadBytes(getStandardMapObjectInspector(javaStringObjectInspector, javaLongObjectInspector), values, values, mapType(VARCHAR, BIGINT), maxReadBlockSize);
}

private static <T> Iterable<T> repeatEach(int n, Iterable<T> iterable)
{
return () -> new AbstractIterator<T>()
Expand Down Expand Up @@ -1604,6 +1642,47 @@ private <T> Iterable<List<T>> createNullableTestArrays(Iterable<T> values)
return insertNullEvery(ThreadLocalRandom.current().nextInt(2, 5), createTestArrays(values));
}

private <T> List<List<T>> createFixedTestArrays(Iterable<T> values)
{
List<List<T>> arrays = new ArrayList<>();
Iterator<T> valuesIter = values.iterator();
List<T> array = new ArrayList<>();
int count = 1;
while (valuesIter.hasNext()) {
if (count % 10 == 0) {
arrays.add(array);
array = new ArrayList<>();
}
if (count % 20 == 0) {
arrays.add(Collections.emptyList());
}
array.add(valuesIter.next());
++count;
}
return arrays;
}

private <K, V> Iterable<Map<K, V>> createFixedTestMaps(Iterable<K> keys, Iterable<V> values)
{
List<Map<K, V>> maps = new ArrayList<>();
Iterator<K> keysIterator = keys.iterator();
Iterator<V> valuesIterator = values.iterator();
Map<K, V> map = new HashMap<>();
int count = 1;
while (keysIterator.hasNext() && valuesIterator.hasNext()) {
if (count % 5 == 0) {
maps.add(map);
map = new HashMap<>();
}
if (count % 10 == 0) {
maps.add(Collections.emptyMap());
}
map.put(keysIterator.next(), valuesIterator.next());
++count;
}
return maps;
}

private <K, V> Iterable<Map<K, V>> createTestMaps(Iterable<K> keys, Iterable<V> values)
{
List<Map<K, V>> maps = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import static com.google.common.collect.Iterables.transform;
import static io.airlift.units.DataSize.succinctBytes;
import static io.prestosql.plugin.hive.AbstractTestHiveFileFormats.getFieldFromCursor;
import static io.prestosql.plugin.hive.HiveSessionProperties.getParquetMaxReadBlockSize;
import static io.prestosql.plugin.hive.HiveTestUtils.createTestHdfsEnvironment;
import static io.prestosql.plugin.hive.HiveUtil.isArrayType;
import static io.prestosql.plugin.hive.HiveUtil.isMapType;
Expand Down Expand Up @@ -204,13 +205,27 @@ public void testSingleLevelArrayRoundTrip(ObjectInspector objectInspector, Itera
public void testRoundTrip(ObjectInspector objectInspector, Iterable<?> writeValues, Iterable<?> readValues, String columnName, Type type, Optional<MessageType> parquetSchema)
throws Exception
{
testRoundTrip(singletonList(objectInspector), new Iterable<?>[] {writeValues}, new Iterable<?>[] {readValues}, singletonList(columnName), singletonList(type), parquetSchema, false);
testRoundTrip(
singletonList(objectInspector),
new Iterable<?>[] {writeValues},
new Iterable<?>[] {readValues},
singletonList(columnName),
singletonList(type),
parquetSchema,
false);
}

public void testSingleLevelArrayRoundTrip(ObjectInspector objectInspector, Iterable<?> writeValues, Iterable<?> readValues, String columnName, Type type, Optional<MessageType> parquetSchema)
throws Exception
{
testRoundTrip(singletonList(objectInspector), new Iterable<?>[] {writeValues}, new Iterable<?>[] {readValues}, singletonList(columnName), singletonList(type), parquetSchema, true);
testRoundTrip(
singletonList(objectInspector),
new Iterable<?>[] {writeValues},
new Iterable<?>[] {readValues},
singletonList(columnName),
singletonList(type),
parquetSchema,
true);
}

public void testRoundTrip(List<ObjectInspector> objectInspectors, Iterable<?>[] writeValues, Iterable<?>[] readValues, List<String> columnNames, List<Type> columnTypes, Optional<MessageType> parquetSchema, boolean singleLevelArray)
Expand All @@ -223,8 +238,14 @@ public void testRoundTrip(List<ObjectInspector> objectInspectors, Iterable<?>[]
assertRoundTrip(objectInspectors, transformToNulls(writeValues), transformToNulls(readValues), columnNames, columnTypes, parquetSchema, singleLevelArray);
}

private void testRoundTripType(List<ObjectInspector> objectInspectors, Iterable<?>[] writeValues, Iterable<?>[] readValues,
List<String> columnNames, List<Type> columnTypes, Optional<MessageType> parquetSchema, boolean singleLevelArray)
private void testRoundTripType(
List<ObjectInspector> objectInspectors,
Iterable<?>[] writeValues,
Iterable<?>[] readValues,
List<String> columnNames,
List<Type> columnTypes,
Optional<MessageType> parquetSchema,
boolean singleLevelArray)
throws Exception
{
// forward order
Expand All @@ -240,7 +261,8 @@ private void testRoundTripType(List<ObjectInspector> objectInspectors, Iterable<
assertRoundTrip(objectInspectors, insertNullEvery(5, reverse(writeValues)), insertNullEvery(5, reverse(readValues)), columnNames, columnTypes, parquetSchema, singleLevelArray);
}

void assertRoundTrip(List<ObjectInspector> objectInspectors,
void assertRoundTrip(
List<ObjectInspector> objectInspectors,
Iterable<?>[] writeValues,
Iterable<?>[] readValues,
List<String> columnNames,
Expand All @@ -251,7 +273,8 @@ void assertRoundTrip(List<ObjectInspector> objectInspectors,
assertRoundTrip(objectInspectors, writeValues, readValues, columnNames, columnTypes, parquetSchema, false);
}

void assertRoundTrip(List<ObjectInspector> objectInspectors,
void assertRoundTrip(
List<ObjectInspector> objectInspectors,
Iterable<?>[] writeValues,
Iterable<?>[] readValues,
List<String> columnNames,
Expand Down Expand Up @@ -289,6 +312,69 @@ void assertRoundTrip(List<ObjectInspector> objectInspectors,
}
}

void testMaxReadBytes(ObjectInspector objectInspector, Iterable<?> writeValues, Iterable<?> readValues, Type type, DataSize maxReadBlockSize)
throws Exception
{
assertMaxReadBytes(
singletonList(objectInspector),
new Iterable<?>[] {writeValues},
new Iterable<?>[] {readValues},
TEST_COLUMN,
singletonList(type),
Optional.empty(),
maxReadBlockSize);
}

void assertMaxReadBytes(
List<ObjectInspector> objectInspectors,
Iterable<?>[] writeValues,
Iterable<?>[] readValues,
List<String> columnNames,
List<Type> columnTypes,
Optional<MessageType> parquetSchema,
DataSize maxReadBlockSize)
throws Exception
{
WriterVersion version = PARQUET_1_0;
CompressionCodecName compressionCodecName = UNCOMPRESSED;
HiveClientConfig config = new HiveClientConfig()
.setHiveStorageFormat(HiveStorageFormat.PARQUET)
.setUseParquetColumnNames(false)
.setParquetMaxReadBlockSize(maxReadBlockSize);
ConnectorSession session = new TestingConnectorSession(new HiveSessionProperties(config, new OrcFileWriterConfig(), new ParquetFileWriterConfig()).getSessionProperties());

try (TempFile tempFile = new TempFile("test", "parquet")) {
JobConf jobConf = new JobConf();
jobConf.setEnum(COMPRESSION, compressionCodecName);
jobConf.setBoolean(ENABLE_DICTIONARY, true);
jobConf.setEnum(WRITER_VERSION, version);
writeParquetColumn(
jobConf,
tempFile.getFile(),
compressionCodecName,
createTableProperties(columnNames, objectInspectors),
getStandardStructObjectInspector(columnNames, objectInspectors),
getIterators(writeValues),
parquetSchema,
false);

Iterator<?>[] expectedValues = getIterators(readValues);
try (ConnectorPageSource pageSource = getFileFormat().createFileFormatReader(
session,
HDFS_ENVIRONMENT,
tempFile.getFile(),
columnNames,
columnTypes)) {
assertPageSource(
columnTypes,
expectedValues,
pageSource,
Optional.of(getParquetMaxReadBlockSize(session).toBytes()));
assertFalse(stream(expectedValues).allMatch(Iterator::hasNext));
}
}
}

private static void assertFileContents(
ConnectorSession session,
File dataFile,
Expand All @@ -314,9 +400,18 @@ private static void assertFileContents(
}

private static void assertPageSource(List<Type> types, Iterator<?>[] valuesByField, ConnectorPageSource pageSource)
{
assertPageSource(types, valuesByField, pageSource, Optional.empty());
}

private static void assertPageSource(List<Type> types, Iterator<?>[] valuesByField, ConnectorPageSource pageSource, Optional<Long> maxReadBlockSize)
{
Page page;
while ((page = pageSource.getNextPage()) != null) {
if (maxReadBlockSize.isPresent()) {
assertTrue(page.getPositionCount() == 1 || page.getSizeInBytes() <= maxReadBlockSize.get());
}

for (int field = 0; field < page.getChannelCount(); field++) {
Block block = page.getBlock(field);
for (int i = 0; i < block.getPositionCount(); i++) {
Expand Down Expand Up @@ -420,7 +515,8 @@ private static FileFormat getFileFormat()
return OPTIMIZED ? FileFormat.PRESTO_PARQUET : FileFormat.HIVE_PARQUET;
}

private static DataSize writeParquetColumn(JobConf jobConf,
private static DataSize writeParquetColumn(
JobConf jobConf,
File outputFile,
CompressionCodecName compressionCodecName,
Properties tableProperties,
Expand Down
Loading