diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveClientConfig.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveClientConfig.java index c37cfb98e8d4..dec334484c6d 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveClientConfig.java @@ -102,6 +102,7 @@ public class HiveClientConfig private boolean useParquetColumnNames; private boolean failOnCorruptedParquetStatistics = true; + private DataSize parquetMaxReadBlockSize = new DataSize(16, MEGABYTE); private boolean assumeCanonicalPartitionKeys; @@ -919,6 +920,19 @@ public HiveClientConfig setFailOnCorruptedParquetStatistics(boolean failOnCorrup return this; } + @NotNull + public DataSize getParquetMaxReadBlockSize() + { + return parquetMaxReadBlockSize; + } + + @Config("hive.parquet.max-read-block-size") + public HiveClientConfig setParquetMaxReadBlockSize(DataSize parquetMaxReadBlockSize) + { + this.parquetMaxReadBlockSize = parquetMaxReadBlockSize; + return this; + } + public boolean isOptimizeMismatchedBucketCount() { return optimizeMismatchedBucketCount; diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java index afc646035f30..d59235a30354 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java @@ -63,6 +63,7 @@ public final class HiveSessionProperties private static final String RESPECT_TABLE_FORMAT = "respect_table_format"; private static final String PARQUET_USE_COLUMN_NAME = "parquet_use_column_names"; private static final String PARQUET_FAIL_WITH_CORRUPTED_STATISTICS = "parquet_fail_with_corrupted_statistics"; + private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size"; private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size"; private static final String MAX_SPLIT_SIZE = "max_split_size"; @@ -232,6 +233,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon "Parquet: Fail when scanning Parquet files with corrupted statistics", hiveClientConfig.isFailOnCorruptedParquetStatistics(), false), + dataSizeSessionProperty( + PARQUET_MAX_READ_BLOCK_SIZE, + "Parquet: Maximum size of a block to read", + hiveClientConfig.getParquetMaxReadBlockSize(), + false), dataSizeSessionProperty( PARQUET_WRITER_BLOCK_SIZE, "Parquet: Writer block size", @@ -426,6 +432,11 @@ public static boolean isFailOnCorruptedParquetStatistics(ConnectorSession sessio return session.getProperty(PARQUET_FAIL_WITH_CORRUPTED_STATISTICS, Boolean.class); } + public static DataSize getParquetMaxReadBlockSize(ConnectorSession session) + { + return session.getProperty(PARQUET_MAX_READ_BLOCK_SIZE, DataSize.class); + } + public static DataSize getParquetWriterBlockSize(ConnectorSession session) { return session.getProperty(PARQUET_WRITER_BLOCK_SIZE, DataSize.class); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetPageSourceFactory.java index b98affd14fbd..f873b3e09433 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.units.DataSize; import io.prestosql.memory.context.AggregatedMemoryContext; import io.prestosql.parquet.ParquetCorruptionException; import io.prestosql.parquet.ParquetDataSource; @@ -68,6 +69,7 @@ import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_MISSING_DATA; +import static io.prestosql.plugin.hive.HiveSessionProperties.getParquetMaxReadBlockSize; import static io.prestosql.plugin.hive.HiveSessionProperties.isFailOnCorruptedParquetStatistics; import static io.prestosql.plugin.hive.HiveSessionProperties.isUseParquetColumnNames; import static io.prestosql.plugin.hive.HiveUtil.getDeserializerClassName; @@ -126,6 +128,7 @@ public Optional createPageSource( columns, isUseParquetColumnNames(session), isFailOnCorruptedParquetStatistics(session), + getParquetMaxReadBlockSize(session), typeManager, effectivePredicate, stats)); @@ -143,6 +146,7 @@ public static ParquetPageSource createParquetPageSource( List columns, boolean useParquetColumnNames, boolean failOnCorruptedParquetStatistics, + DataSize maxReadBlockSize, TypeManager typeManager, TupleDomain effectivePredicate, FileFormatDataSourceStats stats) @@ -189,7 +193,8 @@ public static ParquetPageSource createParquetPageSource( messageColumnIO, blocks.build(), dataSource, - systemMemoryContext); + systemMemoryContext, + maxReadBlockSize); return new ParquetPageSource( parquetReader, diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveClientConfig.java index d898253a1993..f3c0c3fd8857 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveClientConfig.java @@ -81,6 +81,7 @@ public void testDefaults() .setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE)) .setUseParquetColumnNames(false) .setFailOnCorruptedParquetStatistics(true) + .setParquetMaxReadBlockSize(new DataSize(16, Unit.MEGABYTE)) .setUseOrcColumnNames(false) .setAssumeCanonicalPartitionKeys(false) .setOrcBloomFiltersEnabled(false) @@ -165,6 +166,7 @@ public void testExplicitPropertyMappings() .put("hive.text.max-line-length", "13MB") .put("hive.parquet.use-column-names", "true") .put("hive.parquet.fail-on-corrupted-statistics", "false") + .put("hive.parquet.max-read-block-size", "66kB") .put("hive.orc.use-column-names", "true") .put("hive.orc.bloom-filters.enabled", "true") .put("hive.orc.default-bloom-filter-fpp", "0.96") @@ -244,6 +246,7 @@ public void testExplicitPropertyMappings() .setTextMaxLineLength(new DataSize(13, Unit.MEGABYTE)) .setUseParquetColumnNames(true) .setFailOnCorruptedParquetStatistics(false) + .setParquetMaxReadBlockSize(new DataSize(66, Unit.KILOBYTE)) .setUseOrcColumnNames(true) .setAssumeCanonicalPartitionKeys(true) .setOrcBloomFiltersEnabled(true) diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/AbstractTestParquetReader.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/AbstractTestParquetReader.java index 35417a59d098..24b3d991625b 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/AbstractTestParquetReader.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/AbstractTestParquetReader.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Range; import com.google.common.primitives.Shorts; +import io.airlift.units.DataSize; import io.prestosql.spi.type.ArrayType; import io.prestosql.spi.type.RowType; import io.prestosql.spi.type.SqlDate; @@ -1485,6 +1486,43 @@ protected T computeNext() }; } + @Test + public void testStructMaxReadBytes() + throws Exception + { + DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE); + List structValues = createTestStructs( + Collections.nCopies(500, String.join("", Collections.nCopies(33, "test"))), + Collections.nCopies(500, String.join("", Collections.nCopies(1, "test")))); + List structFieldNames = asList("a", "b"); + Type structType = RowType.from(asList(field("a", VARCHAR), field("b", VARCHAR))); + + tester.testMaxReadBytes( + getStandardStructObjectInspector(structFieldNames, asList(javaStringObjectInspector, javaStringObjectInspector)), + structValues, + structValues, + structType, + maxReadBlockSize); + } + + @Test + public void testArrayMaxReadBytes() + throws Exception + { + DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE); + Iterable> values = createFixedTestArrays(limit(cycle(asList(1, null, 3, 5, null, null, null, 7, 11, null, 13, 17)), 30_000)); + tester.testMaxReadBytes(getStandardListObjectInspector(javaIntObjectInspector), values, values, new ArrayType(INTEGER), maxReadBlockSize); + } + + @Test + public void testMapMaxReadBytes() + throws Exception + { + DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE); + Iterable> values = createFixedTestMaps(Collections.nCopies(5_000, String.join("", Collections.nCopies(33, "test"))), longsBetween(0, 5_000)); + tester.testMaxReadBytes(getStandardMapObjectInspector(javaStringObjectInspector, javaLongObjectInspector), values, values, mapType(VARCHAR, BIGINT), maxReadBlockSize); + } + private static Iterable repeatEach(int n, Iterable iterable) { return () -> new AbstractIterator() @@ -1604,6 +1642,47 @@ private Iterable> createNullableTestArrays(Iterable values) return insertNullEvery(ThreadLocalRandom.current().nextInt(2, 5), createTestArrays(values)); } + private List> createFixedTestArrays(Iterable values) + { + List> arrays = new ArrayList<>(); + Iterator valuesIter = values.iterator(); + List array = new ArrayList<>(); + int count = 1; + while (valuesIter.hasNext()) { + if (count % 10 == 0) { + arrays.add(array); + array = new ArrayList<>(); + } + if (count % 20 == 0) { + arrays.add(Collections.emptyList()); + } + array.add(valuesIter.next()); + ++count; + } + return arrays; + } + + private Iterable> createFixedTestMaps(Iterable keys, Iterable values) + { + List> maps = new ArrayList<>(); + Iterator keysIterator = keys.iterator(); + Iterator valuesIterator = values.iterator(); + Map map = new HashMap<>(); + int count = 1; + while (keysIterator.hasNext() && valuesIterator.hasNext()) { + if (count % 5 == 0) { + maps.add(map); + map = new HashMap<>(); + } + if (count % 10 == 0) { + maps.add(Collections.emptyMap()); + } + map.put(keysIterator.next(), valuesIterator.next()); + ++count; + } + return maps; + } + private Iterable> createTestMaps(Iterable keys, Iterable values) { List> maps = new ArrayList<>(); diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/ParquetTester.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/ParquetTester.java index 1b41d3e45306..83ae6ec7a9eb 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/ParquetTester.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/ParquetTester.java @@ -85,6 +85,7 @@ import static com.google.common.collect.Iterables.transform; import static io.airlift.units.DataSize.succinctBytes; import static io.prestosql.plugin.hive.AbstractTestHiveFileFormats.getFieldFromCursor; +import static io.prestosql.plugin.hive.HiveSessionProperties.getParquetMaxReadBlockSize; import static io.prestosql.plugin.hive.HiveTestUtils.createTestHdfsEnvironment; import static io.prestosql.plugin.hive.HiveUtil.isArrayType; import static io.prestosql.plugin.hive.HiveUtil.isMapType; @@ -204,13 +205,27 @@ public void testSingleLevelArrayRoundTrip(ObjectInspector objectInspector, Itera public void testRoundTrip(ObjectInspector objectInspector, Iterable writeValues, Iterable readValues, String columnName, Type type, Optional parquetSchema) throws Exception { - testRoundTrip(singletonList(objectInspector), new Iterable[] {writeValues}, new Iterable[] {readValues}, singletonList(columnName), singletonList(type), parquetSchema, false); + testRoundTrip( + singletonList(objectInspector), + new Iterable[] {writeValues}, + new Iterable[] {readValues}, + singletonList(columnName), + singletonList(type), + parquetSchema, + false); } public void testSingleLevelArrayRoundTrip(ObjectInspector objectInspector, Iterable writeValues, Iterable readValues, String columnName, Type type, Optional parquetSchema) throws Exception { - testRoundTrip(singletonList(objectInspector), new Iterable[] {writeValues}, new Iterable[] {readValues}, singletonList(columnName), singletonList(type), parquetSchema, true); + testRoundTrip( + singletonList(objectInspector), + new Iterable[] {writeValues}, + new Iterable[] {readValues}, + singletonList(columnName), + singletonList(type), + parquetSchema, + true); } public void testRoundTrip(List objectInspectors, Iterable[] writeValues, Iterable[] readValues, List columnNames, List columnTypes, Optional parquetSchema, boolean singleLevelArray) @@ -223,8 +238,14 @@ public void testRoundTrip(List objectInspectors, Iterable[] assertRoundTrip(objectInspectors, transformToNulls(writeValues), transformToNulls(readValues), columnNames, columnTypes, parquetSchema, singleLevelArray); } - private void testRoundTripType(List objectInspectors, Iterable[] writeValues, Iterable[] readValues, - List columnNames, List columnTypes, Optional parquetSchema, boolean singleLevelArray) + private void testRoundTripType( + List objectInspectors, + Iterable[] writeValues, + Iterable[] readValues, + List columnNames, + List columnTypes, + Optional parquetSchema, + boolean singleLevelArray) throws Exception { // forward order @@ -240,7 +261,8 @@ private void testRoundTripType(List objectInspectors, Iterable< assertRoundTrip(objectInspectors, insertNullEvery(5, reverse(writeValues)), insertNullEvery(5, reverse(readValues)), columnNames, columnTypes, parquetSchema, singleLevelArray); } - void assertRoundTrip(List objectInspectors, + void assertRoundTrip( + List objectInspectors, Iterable[] writeValues, Iterable[] readValues, List columnNames, @@ -251,7 +273,8 @@ void assertRoundTrip(List objectInspectors, assertRoundTrip(objectInspectors, writeValues, readValues, columnNames, columnTypes, parquetSchema, false); } - void assertRoundTrip(List objectInspectors, + void assertRoundTrip( + List objectInspectors, Iterable[] writeValues, Iterable[] readValues, List columnNames, @@ -289,6 +312,69 @@ void assertRoundTrip(List objectInspectors, } } + void testMaxReadBytes(ObjectInspector objectInspector, Iterable writeValues, Iterable readValues, Type type, DataSize maxReadBlockSize) + throws Exception + { + assertMaxReadBytes( + singletonList(objectInspector), + new Iterable[] {writeValues}, + new Iterable[] {readValues}, + TEST_COLUMN, + singletonList(type), + Optional.empty(), + maxReadBlockSize); + } + + void assertMaxReadBytes( + List objectInspectors, + Iterable[] writeValues, + Iterable[] readValues, + List columnNames, + List columnTypes, + Optional parquetSchema, + DataSize maxReadBlockSize) + throws Exception + { + WriterVersion version = PARQUET_1_0; + CompressionCodecName compressionCodecName = UNCOMPRESSED; + HiveClientConfig config = new HiveClientConfig() + .setHiveStorageFormat(HiveStorageFormat.PARQUET) + .setUseParquetColumnNames(false) + .setParquetMaxReadBlockSize(maxReadBlockSize); + ConnectorSession session = new TestingConnectorSession(new HiveSessionProperties(config, new OrcFileWriterConfig(), new ParquetFileWriterConfig()).getSessionProperties()); + + try (TempFile tempFile = new TempFile("test", "parquet")) { + JobConf jobConf = new JobConf(); + jobConf.setEnum(COMPRESSION, compressionCodecName); + jobConf.setBoolean(ENABLE_DICTIONARY, true); + jobConf.setEnum(WRITER_VERSION, version); + writeParquetColumn( + jobConf, + tempFile.getFile(), + compressionCodecName, + createTableProperties(columnNames, objectInspectors), + getStandardStructObjectInspector(columnNames, objectInspectors), + getIterators(writeValues), + parquetSchema, + false); + + Iterator[] expectedValues = getIterators(readValues); + try (ConnectorPageSource pageSource = getFileFormat().createFileFormatReader( + session, + HDFS_ENVIRONMENT, + tempFile.getFile(), + columnNames, + columnTypes)) { + assertPageSource( + columnTypes, + expectedValues, + pageSource, + Optional.of(getParquetMaxReadBlockSize(session).toBytes())); + assertFalse(stream(expectedValues).allMatch(Iterator::hasNext)); + } + } + } + private static void assertFileContents( ConnectorSession session, File dataFile, @@ -314,9 +400,18 @@ private static void assertFileContents( } private static void assertPageSource(List types, Iterator[] valuesByField, ConnectorPageSource pageSource) + { + assertPageSource(types, valuesByField, pageSource, Optional.empty()); + } + + private static void assertPageSource(List types, Iterator[] valuesByField, ConnectorPageSource pageSource, Optional maxReadBlockSize) { Page page; while ((page = pageSource.getNextPage()) != null) { + if (maxReadBlockSize.isPresent()) { + assertTrue(page.getPositionCount() == 1 || page.getSizeInBytes() <= maxReadBlockSize.get()); + } + for (int field = 0; field < page.getChannelCount(); field++) { Block block = page.getBlock(field); for (int i = 0; i < block.getPositionCount(); i++) { @@ -420,7 +515,8 @@ private static FileFormat getFileFormat() return OPTIMIZED ? FileFormat.PRESTO_PARQUET : FileFormat.HIVE_PARQUET; } - private static DataSize writeParquetColumn(JobConf jobConf, + private static DataSize writeParquetColumn( + JobConf jobConf, File outputFile, CompressionCodecName compressionCodecName, Properties tableProperties, diff --git a/presto-parquet/pom.xml b/presto-parquet/pom.xml index 52bdecedd84f..5f1a108a4785 100644 --- a/presto-parquet/pom.xml +++ b/presto-parquet/pom.xml @@ -37,6 +37,11 @@ aircompressor + + io.airlift + units + + com.google.guava guava diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/reader/ParquetReader.java b/presto-parquet/src/main/java/io/prestosql/parquet/reader/ParquetReader.java index 9416f9c5496c..23a75ea597a6 100644 --- a/presto-parquet/src/main/java/io/prestosql/parquet/reader/ParquetReader.java +++ b/presto-parquet/src/main/java/io/prestosql/parquet/reader/ParquetReader.java @@ -13,6 +13,7 @@ */ package io.prestosql.parquet.reader; +import io.airlift.units.DataSize; import io.prestosql.memory.context.AggregatedMemoryContext; import io.prestosql.memory.context.LocalMemoryContext; import io.prestosql.parquet.Field; @@ -51,6 +52,7 @@ import static io.prestosql.spi.type.StandardTypes.ARRAY; import static io.prestosql.spi.type.StandardTypes.MAP; import static io.prestosql.spi.type.StandardTypes.ROW; +import static java.lang.Math.max; import static java.lang.Math.min; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; @@ -59,6 +61,8 @@ public class ParquetReader implements Closeable { private static final int MAX_VECTOR_LENGTH = 1024; + private static final int INITIAL_BATCH_SIZE = 1; + private static final int BATCH_SIZE_GROWTH_FACTOR = 2; private final List blocks; private final List columns; @@ -71,21 +75,29 @@ public class ParquetReader private long currentGroupRowCount; private long nextRowInGroup; private int batchSize; + private int nextBatchSize = INITIAL_BATCH_SIZE; private final PrimitiveColumnReader[] columnReaders; + private long[] maxBytesPerCell; + private long maxCombinedBytesPerRow; + private final long maxReadBlockBytes; + private int maxBatchSize = MAX_VECTOR_LENGTH; private AggregatedMemoryContext currentRowGroupMemoryContext; public ParquetReader(MessageColumnIO messageColumnIO, List blocks, ParquetDataSource dataSource, - AggregatedMemoryContext systemMemoryContext) + AggregatedMemoryContext systemMemoryContext, + DataSize maxReadBlockSize) { this.blocks = blocks; this.dataSource = requireNonNull(dataSource, "dataSource is null"); this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null"); this.currentRowGroupMemoryContext = systemMemoryContext.newAggregatedMemoryContext(); + this.maxReadBlockBytes = requireNonNull(maxReadBlockSize, "maxReadBlockSize is null").toBytes(); columns = messageColumnIO.getLeaves(); columnReaders = new PrimitiveColumnReader[columns.size()]; + maxBytesPerCell = new long[columns.size()]; } @Override @@ -107,7 +119,9 @@ public int nextBatch() return -1; } - batchSize = toIntExact(min(MAX_VECTOR_LENGTH, currentGroupRowCount - nextRowInGroup)); + batchSize = toIntExact(min(nextBatchSize, maxBatchSize)); + nextBatchSize = min(batchSize * BATCH_SIZE_GROWTH_FACTOR, MAX_VECTOR_LENGTH); + batchSize = toIntExact(min(batchSize, currentGroupRowCount - nextRowInGroup)); nextRowInGroup += batchSize; currentPosition += batchSize; @@ -194,7 +208,8 @@ private ColumnChunk readPrimitive(PrimitiveField field) throws IOException { ColumnDescriptor columnDescriptor = field.getDescriptor(); - PrimitiveColumnReader columnReader = columnReaders[field.getId()]; + int fieldId = field.getId(); + PrimitiveColumnReader columnReader = columnReaders[fieldId]; if (columnReader.getPageReader() == null) { validateParquet(currentBlockMetadata.getRowCount() > 0, "Row group has 0 rows"); ColumnChunkMetaData metadata = getColumnChunkMetaData(columnDescriptor); @@ -206,7 +221,17 @@ private ColumnChunk readPrimitive(PrimitiveField field) ParquetColumnChunk columnChunk = new ParquetColumnChunk(descriptor, buffer, 0); columnReader.setPageReader(columnChunk.readAllPages()); } - return columnReader.readPrimitive(field); + ColumnChunk columnChunk = columnReader.readPrimitive(field); + + // update max size per primitive column chunk + long bytesPerCell = columnChunk.getBlock().getSizeInBytes() / batchSize; + if (maxBytesPerCell[fieldId] < bytesPerCell) { + // update batch size + maxCombinedBytesPerRow = maxCombinedBytesPerRow - maxBytesPerCell[fieldId] + bytesPerCell; + maxBatchSize = toIntExact(min(maxBatchSize, max(1, maxReadBlockBytes / maxCombinedBytesPerRow))); + maxBytesPerCell[fieldId] = bytesPerCell; + } + return columnChunk; } private byte[] allocateBlock(int length)