From 2f9d264f89ef3dd23fee7bdf34f4c053f7dd96c4 Mon Sep 17 00:00:00 2001 From: Amit Chopra Date: Fri, 6 Sep 2019 10:30:58 -0700 Subject: [PATCH 1/3] Part 2 of the Parquet dynamic batch size #58 --- .../presto/hive/parquet/ParquetTester.java | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/ParquetTester.java b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/ParquetTester.java index 92011928657e2..a7d028088199a 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/ParquetTester.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/ParquetTester.java @@ -204,13 +204,27 @@ public void testSingleLevelArrayRoundTrip(ObjectInspector objectInspector, Itera public void testRoundTrip(ObjectInspector objectInspector, Iterable writeValues, Iterable readValues, String columnName, Type type, Optional parquetSchema) throws Exception { - testRoundTrip(singletonList(objectInspector), new Iterable[] {writeValues}, new Iterable[] {readValues}, singletonList(columnName), singletonList(type), parquetSchema, false); + testRoundTrip( + singletonList(objectInspector), + new Iterable[] {writeValues}, + new Iterable[] {readValues}, + singletonList(columnName), + singletonList(type), + parquetSchema, + false); } public void testSingleLevelArrayRoundTrip(ObjectInspector objectInspector, Iterable writeValues, Iterable readValues, String columnName, Type type, Optional parquetSchema) throws Exception { - testRoundTrip(singletonList(objectInspector), new Iterable[] {writeValues}, new Iterable[] {readValues}, singletonList(columnName), singletonList(type), parquetSchema, true); + testRoundTrip( + singletonList(objectInspector), + new Iterable[] {writeValues}, + new Iterable[] {readValues}, + singletonList(columnName), + singletonList(type), + parquetSchema, + true); } public void testRoundTrip(List objectInspectors, Iterable[] writeValues, Iterable[] readValues, List columnNames, List columnTypes, Optional parquetSchema, boolean singleLevelArray) @@ -223,8 +237,14 @@ public void testRoundTrip(List objectInspectors, Iterable[] assertRoundTrip(objectInspectors, transformToNulls(writeValues), transformToNulls(readValues), columnNames, columnTypes, parquetSchema, singleLevelArray); } - private void testRoundTripType(List objectInspectors, Iterable[] writeValues, Iterable[] readValues, - List columnNames, List columnTypes, Optional parquetSchema, boolean singleLevelArray) + private void testRoundTripType( + List objectInspectors, + Iterable[] writeValues, + Iterable[] readValues, + List columnNames, + List columnTypes, + Optional parquetSchema, + boolean singleLevelArray) throws Exception { // forward order @@ -420,7 +440,8 @@ private static FileFormat getFileFormat() return OPTIMIZED ? FileFormat.PRESTO_PARQUET : FileFormat.HIVE_PARQUET; } - private static DataSize writeParquetColumn(JobConf jobConf, + private static DataSize writeParquetColumn( + JobConf jobConf, File outputFile, CompressionCodecName compressionCodecName, Properties tableProperties, From 03608b942ca447a395e040da8e4aa093a0f39e2b Mon Sep 17 00:00:00 2001 From: Amit Chopra Date: Tue, 3 Sep 2019 14:27:26 -0700 Subject: [PATCH 2/3] Merging Commit "Add dynamic batch sizing in Parquet reader" for Parquet dynamic batch size #58 --- .../presto/hive/HiveClientConfig.java | 14 ++++ .../presto/hive/HiveSessionProperties.java | 11 +++ .../parquet/ParquetPageSourceFactory.java | 7 +- .../presto/hive/TestHiveClientConfig.java | 3 + .../parquet/AbstractTestParquetReader.java | 79 +++++++++++++++++++ .../presto/hive/parquet/ParquetTester.java | 63 +++++++++++++++ presto-parquet/pom.xml | 5 ++ .../presto/parquet/reader/ParquetReader.java | 34 +++++++- 8 files changed, 211 insertions(+), 5 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index 86dc4bfb21199..884edad736426 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -104,6 +104,7 @@ public class HiveClientConfig private boolean useParquetColumnNames; private boolean failOnCorruptedParquetStatistics = true; + private DataSize parquetMaxReadBlockSize = new DataSize(16, MEGABYTE); private boolean assumeCanonicalPartitionKeys; @@ -946,6 +947,19 @@ public HiveClientConfig setFailOnCorruptedParquetStatistics(boolean failOnCorrup return this; } + @NotNull + public DataSize getParquetMaxReadBlockSize() + { + return parquetMaxReadBlockSize; + } + + @Config("hive.parquet.max-read-block-size") + public HiveClientConfig setParquetMaxReadBlockSize(DataSize parquetMaxReadBlockSize) + { + this.parquetMaxReadBlockSize = parquetMaxReadBlockSize; + return this; + } + @Deprecated public boolean isOptimizeMismatchedBucketCount() { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index 1018fe0a5a97a..d8e9cba900973 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -65,6 +65,7 @@ public final class HiveSessionProperties private static final String RESPECT_TABLE_FORMAT = "respect_table_format"; private static final String PARQUET_USE_COLUMN_NAME = "parquet_use_column_names"; private static final String PARQUET_FAIL_WITH_CORRUPTED_STATISTICS = "parquet_fail_with_corrupted_statistics"; + private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size"; private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size"; private static final String MAX_SPLIT_SIZE = "max_split_size"; @@ -256,6 +257,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon "Parquet: Fail when scanning Parquet files with corrupted statistics", hiveClientConfig.isFailOnCorruptedParquetStatistics(), false), + dataSizeSessionProperty( + PARQUET_MAX_READ_BLOCK_SIZE, + "Parquet: Maximum size of a block to read", + hiveClientConfig.getParquetMaxReadBlockSize(), + false), dataSizeSessionProperty( PARQUET_WRITER_BLOCK_SIZE, "Parquet: Writer block size", @@ -513,6 +519,11 @@ public static boolean isFailOnCorruptedParquetStatistics(ConnectorSession sessio return session.getProperty(PARQUET_FAIL_WITH_CORRUPTED_STATISTICS, Boolean.class); } + public static DataSize getParquetMaxReadBlockSize(ConnectorSession session) + { + return session.getProperty(PARQUET_MAX_READ_BLOCK_SIZE, DataSize.class); + } + public static DataSize getParquetWriterBlockSize(ConnectorSession session) { return session.getProperty(PARQUET_WRITER_BLOCK_SIZE, DataSize.class); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java index 4530c1b1dbfa1..164e64fd41c7f 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java @@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.units.DataSize; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -61,6 +62,7 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA; +import static com.facebook.presto.hive.HiveSessionProperties.getParquetMaxReadBlockSize; import static com.facebook.presto.hive.HiveSessionProperties.isFailOnCorruptedParquetStatistics; import static com.facebook.presto.hive.HiveSessionProperties.isUseParquetColumnNames; import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName; @@ -126,6 +128,7 @@ public Optional createPageSource( columns, isUseParquetColumnNames(session), isFailOnCorruptedParquetStatistics(session), + getParquetMaxReadBlockSize(session), typeManager, effectivePredicate, stats)); @@ -143,6 +146,7 @@ public static ParquetPageSource createParquetPageSource( List columns, boolean useParquetColumnNames, boolean failOnCorruptedParquetStatistics, + DataSize maxReadBlockSize, TypeManager typeManager, TupleDomain effectivePredicate, FileFormatDataSourceStats stats) @@ -189,7 +193,8 @@ public static ParquetPageSource createParquetPageSource( messageColumnIO, blocks.build(), dataSource, - systemMemoryContext); + systemMemoryContext, + maxReadBlockSize); return new ParquetPageSource( parquetReader, diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index e57c96fdb7e72..5af5ba57e5c50 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -86,6 +86,7 @@ public void testDefaults() .setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE)) .setUseParquetColumnNames(false) .setFailOnCorruptedParquetStatistics(true) + .setParquetMaxReadBlockSize(new DataSize(16, Unit.MEGABYTE)) .setUseOrcColumnNames(false) .setAssumeCanonicalPartitionKeys(false) .setOrcBloomFiltersEnabled(false) @@ -180,6 +181,7 @@ public void testExplicitPropertyMappings() .put("hive.text.max-line-length", "13MB") .put("hive.parquet.use-column-names", "true") .put("hive.parquet.fail-on-corrupted-statistics", "false") + .put("hive.parquet.max-read-block-size", "66kB") .put("hive.orc.use-column-names", "true") .put("hive.orc.bloom-filters.enabled", "true") .put("hive.orc.default-bloom-filter-fpp", "0.96") @@ -269,6 +271,7 @@ public void testExplicitPropertyMappings() .setTextMaxLineLength(new DataSize(13, Unit.MEGABYTE)) .setUseParquetColumnNames(true) .setFailOnCorruptedParquetStatistics(false) + .setParquetMaxReadBlockSize(new DataSize(66, Unit.KILOBYTE)) .setUseOrcColumnNames(true) .setAssumeCanonicalPartitionKeys(true) .setOrcBloomFiltersEnabled(true) diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java index 0a24e81918550..11dbdc89934b2 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Range; import com.google.common.primitives.Shorts; +import io.airlift.units.DataSize; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector; @@ -1482,6 +1483,43 @@ protected T computeNext() }; } + @Test + public void testStructMaxReadBytes() + throws Exception + { + DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE); + List structValues = createTestStructs( + Collections.nCopies(500, String.join("", Collections.nCopies(33, "test"))), + Collections.nCopies(500, String.join("", Collections.nCopies(1, "test")))); + List structFieldNames = asList("a", "b"); + Type structType = RowType.from(asList(field("a", VARCHAR), field("b", VARCHAR))); + + tester.testMaxReadBytes( + getStandardStructObjectInspector(structFieldNames, asList(javaStringObjectInspector, javaStringObjectInspector)), + structValues, + structValues, + structType, + maxReadBlockSize); + } + + @Test + public void testArrayMaxReadBytes() + throws Exception + { + DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE); + Iterable> values = createFixedTestArrays(limit(cycle(asList(1, null, 3, 5, null, null, null, 7, 11, null, 13, 17)), 30_000)); + tester.testMaxReadBytes(getStandardListObjectInspector(javaIntObjectInspector), values, values, new ArrayType(INTEGER), maxReadBlockSize); + } + + @Test + public void testMapMaxReadBytes() + throws Exception + { + DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE); + Iterable> values = createFixedTestMaps(Collections.nCopies(5_000, String.join("", Collections.nCopies(33, "test"))), longsBetween(0, 5_000)); + tester.testMaxReadBytes(getStandardMapObjectInspector(javaStringObjectInspector, javaLongObjectInspector), values, values, mapType(VARCHAR, BIGINT), maxReadBlockSize); + } + // parquet has excessive logging at INFO level, set them to WARNING private void setParquetLogging() { @@ -1611,6 +1649,47 @@ private Iterable> createNullableTestArrays(Iterable values) return insertNullEvery(ThreadLocalRandom.current().nextInt(2, 5), createTestArrays(values)); } + private List> createFixedTestArrays(Iterable values) + { + List> arrays = new ArrayList<>(); + Iterator valuesIter = values.iterator(); + List array = new ArrayList<>(); + int count = 1; + while (valuesIter.hasNext()) { + if (count % 10 == 0) { + arrays.add(array); + array = new ArrayList<>(); + } + if (count % 20 == 0) { + arrays.add(Collections.emptyList()); + } + array.add(valuesIter.next()); + ++count; + } + return arrays; + } + + private Iterable> createFixedTestMaps(Iterable keys, Iterable values) + { + List> maps = new ArrayList<>(); + Iterator keysIterator = keys.iterator(); + Iterator valuesIterator = values.iterator(); + Map map = new HashMap<>(); + int count = 1; + while (keysIterator.hasNext() && valuesIterator.hasNext()) { + if (count % 5 == 0) { + maps.add(map); + map = new HashMap<>(); + } + if (count % 10 == 0) { + maps.add(Collections.emptyMap()); + } + map.put(keysIterator.next(), valuesIterator.next()); + ++count; + } + return maps; + } + private Iterable> createTestMaps(Iterable keys, Iterable values) { List> maps = new ArrayList<>(); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/ParquetTester.java b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/ParquetTester.java index a7d028088199a..56f2ba46609f0 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/ParquetTester.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/ParquetTester.java @@ -82,6 +82,7 @@ import java.util.Set; import static com.facebook.presto.hive.AbstractTestHiveFileFormats.getFieldFromCursor; +import static com.facebook.presto.hive.HiveSessionProperties.getParquetMaxReadBlockSize; import static com.facebook.presto.hive.HiveTestUtils.createTestHdfsEnvironment; import static com.facebook.presto.hive.HiveUtil.isArrayType; import static com.facebook.presto.hive.HiveUtil.isMapType; @@ -309,6 +310,60 @@ void assertRoundTrip(List objectInspectors, } } + void testMaxReadBytes(ObjectInspector objectInspector, Iterable writeValues, Iterable readValues, Type type, DataSize maxReadBlockSize) + throws Exception + { + assertMaxReadBytes(singletonList(objectInspector), new Iterable[] {writeValues}, new Iterable[] { + readValues}, TEST_COLUMN, singletonList(type), Optional.empty(), maxReadBlockSize); + } + + void assertMaxReadBytes( + List objectInspectors, + Iterable[] writeValues, + Iterable[] readValues, + List columnNames, + List columnTypes, + Optional parquetSchema, + DataSize maxReadBlockSize) + throws Exception + { + WriterVersion version = PARQUET_1_0; + CompressionCodecName compressionCodecName = UNCOMPRESSED; + HiveClientConfig config = new HiveClientConfig().setHiveStorageFormat(HiveStorageFormat.PARQUET).setUseParquetColumnNames(false).setParquetMaxReadBlockSize(maxReadBlockSize); + ConnectorSession session = new TestingConnectorSession(new HiveSessionProperties(config, new OrcFileWriterConfig(), new ParquetFileWriterConfig()).getSessionProperties()); + + try (TempFile tempFile = new TempFile("test", "parquet")) { + JobConf jobConf = new JobConf(); + jobConf.setEnum(COMPRESSION, compressionCodecName); + jobConf.setBoolean(ENABLE_DICTIONARY, true); + jobConf.setEnum(WRITER_VERSION, version); + writeParquetColumn( + jobConf, + tempFile.getFile(), + compressionCodecName, + createTableProperties(columnNames, objectInspectors), + getStandardStructObjectInspector(columnNames, objectInspectors), + getIterators(writeValues), + parquetSchema, + false); + + Iterator[] expectedValues = getIterators(readValues); + try (ConnectorPageSource pageSource = getFileFormat().createFileFormatReader( + session, + HDFS_ENVIRONMENT, + tempFile.getFile(), + columnNames, + columnTypes)) { + assertPageSource( + columnTypes, + expectedValues, + pageSource, + Optional.of(getParquetMaxReadBlockSize(session).toBytes())); + assertFalse(stream(expectedValues).allMatch(Iterator::hasNext)); + } + } + } + private static void assertFileContents( ConnectorSession session, File dataFile, @@ -334,9 +389,17 @@ private static void assertFileContents( } private static void assertPageSource(List types, Iterator[] valuesByField, ConnectorPageSource pageSource) + { + assertPageSource(types, valuesByField, pageSource, Optional.empty()); + } + + private static void assertPageSource(List types, Iterator[] valuesByField, ConnectorPageSource pageSource, Optional maxReadBlockSize) { Page page; while ((page = pageSource.getNextPage()) != null) { + if (maxReadBlockSize.isPresent()) { + assertTrue(page.getPositionCount() == 1 || page.getSizeInBytes() <= maxReadBlockSize.get()); + } for (int field = 0; field < page.getChannelCount(); field++) { Block block = page.getBlock(field); for (int i = 0; i < block.getPositionCount(); i++) { diff --git a/presto-parquet/pom.xml b/presto-parquet/pom.xml index f8246d5d95fcb..a5554fc13fa24 100644 --- a/presto-parquet/pom.xml +++ b/presto-parquet/pom.xml @@ -37,6 +37,11 @@ aircompressor + + io.airlift + units + + com.google.guava guava diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java index 8e06f91fe561a..3c28461381cfb 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.parquet.reader; +import io.airlift.units.DataSize; import com.facebook.presto.memory.context.AggregatedMemoryContext; import com.facebook.presto.memory.context.LocalMemoryContext; import com.facebook.presto.parquet.Field; @@ -50,6 +51,7 @@ import static com.facebook.presto.spi.type.StandardTypes.ARRAY; import static com.facebook.presto.spi.type.StandardTypes.MAP; import static com.facebook.presto.spi.type.StandardTypes.ROW; +import static java.lang.Math.max; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.Math.min; import static java.lang.Math.toIntExact; @@ -59,6 +61,9 @@ public class ParquetReader implements Closeable { private static final int MAX_VECTOR_LENGTH = 1024; + private static final int INITIAL_BATCH_SIZE = 1; + private static final int BATCH_SIZE_GROWTH_FACTOR = 2; + private final List blocks; private final List columns; @@ -71,21 +76,29 @@ public class ParquetReader private long currentGroupRowCount; private long nextRowInGroup; private int batchSize; + private int nextBatchSize = INITIAL_BATCH_SIZE; private final PrimitiveColumnReader[] columnReaders; + private long[] maxBytesPerCell; + private long maxCombinedBytesPerRow; + private final long maxReadBlockBytes; + private int maxBatchSize = MAX_VECTOR_LENGTH; private AggregatedMemoryContext currentRowGroupMemoryContext; public ParquetReader(MessageColumnIO messageColumnIO, List blocks, ParquetDataSource dataSource, - AggregatedMemoryContext systemMemoryContext) + AggregatedMemoryContext systemMemoryContext, + DataSize maxReadBlockSize) { this.blocks = blocks; this.dataSource = requireNonNull(dataSource, "dataSource is null"); this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null"); this.currentRowGroupMemoryContext = systemMemoryContext.newAggregatedMemoryContext(); + this.maxReadBlockBytes = requireNonNull(maxReadBlockSize, "maxReadBlockSize is null").toBytes(); columns = messageColumnIO.getLeaves(); columnReaders = new PrimitiveColumnReader[columns.size()]; + maxBytesPerCell = new long[columns.size()]; } @Override @@ -107,7 +120,9 @@ public int nextBatch() return -1; } - batchSize = toIntExact(min(MAX_VECTOR_LENGTH, currentGroupRowCount - nextRowInGroup)); + batchSize = toIntExact(min(nextBatchSize, maxBatchSize)); + nextBatchSize = min(batchSize * BATCH_SIZE_GROWTH_FACTOR, MAX_VECTOR_LENGTH); + batchSize = toIntExact(min(batchSize, currentGroupRowCount - nextRowInGroup)); nextRowInGroup += batchSize; currentPosition += batchSize; @@ -194,7 +209,8 @@ private ColumnChunk readPrimitive(PrimitiveField field) throws IOException { ColumnDescriptor columnDescriptor = field.getDescriptor(); - PrimitiveColumnReader columnReader = columnReaders[field.getId()]; + int fieldId = field.getId(); + PrimitiveColumnReader columnReader = columnReaders[fieldId]; if (columnReader.getPageReader() == null) { validateParquet(currentBlockMetadata.getRowCount() > 0, "Row group has 0 rows"); ColumnChunkMetaData metadata = getColumnChunkMetaData(columnDescriptor); @@ -206,7 +222,17 @@ private ColumnChunk readPrimitive(PrimitiveField field) ParquetColumnChunk columnChunk = new ParquetColumnChunk(descriptor, buffer, 0); columnReader.setPageReader(columnChunk.readAllPages()); } - return columnReader.readPrimitive(field); + ColumnChunk columnChunk = columnReader.readPrimitive(field); + + // update max size per primitive column chunk + long bytesPerCell = columnChunk.getBlock().getSizeInBytes() / batchSize; + if (maxBytesPerCell[fieldId] < bytesPerCell) { + // update batch size + maxCombinedBytesPerRow = maxCombinedBytesPerRow - maxBytesPerCell[fieldId] + bytesPerCell; + maxBatchSize = toIntExact(min(maxBatchSize, max(1, maxReadBlockBytes / maxCombinedBytesPerRow))); + maxBytesPerCell[fieldId] = bytesPerCell; + } + return columnChunk; } private byte[] allocateBlock(int length) From dec4f60a9d830fe4092ec608c68c0be617d17870 Mon Sep 17 00:00:00 2001 From: Amit Chopra Date: Tue, 10 Sep 2019 14:11:59 -0700 Subject: [PATCH 3/3] Resolving a merge conflict --- .../presto/hive/parquet/ParquetPageSourceFactory.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java index 9d4ae7d5debb3..1f270b88430cf 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java @@ -98,14 +98,6 @@ import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; public class ParquetPageSourceFactory implements HiveBatchPageSourceFactory