diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index 86dc4bfb21199..884edad736426 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -104,6 +104,7 @@ public class HiveClientConfig private boolean useParquetColumnNames; private boolean failOnCorruptedParquetStatistics = true; + private DataSize parquetMaxReadBlockSize = new DataSize(16, MEGABYTE); private boolean assumeCanonicalPartitionKeys; @@ -946,6 +947,19 @@ public HiveClientConfig setFailOnCorruptedParquetStatistics(boolean failOnCorrup return this; } + @NotNull + public DataSize getParquetMaxReadBlockSize() + { + return parquetMaxReadBlockSize; + } + + @Config("hive.parquet.max-read-block-size") + public HiveClientConfig setParquetMaxReadBlockSize(DataSize parquetMaxReadBlockSize) + { + this.parquetMaxReadBlockSize = parquetMaxReadBlockSize; + return this; + } + @Deprecated public boolean isOptimizeMismatchedBucketCount() { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index b30251d0adb7c..e5fd1c69eb2b3 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -65,6 +65,7 @@ public final class HiveSessionProperties private static final String RESPECT_TABLE_FORMAT = "respect_table_format"; private static final String PARQUET_USE_COLUMN_NAME = "parquet_use_column_names"; private static final String PARQUET_FAIL_WITH_CORRUPTED_STATISTICS = "parquet_fail_with_corrupted_statistics"; + private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size"; private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size"; private static final String MAX_SPLIT_SIZE = "max_split_size"; @@ -257,6 +258,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon "Parquet: Fail when scanning Parquet files with corrupted statistics", hiveClientConfig.isFailOnCorruptedParquetStatistics(), false), + dataSizeSessionProperty( + PARQUET_MAX_READ_BLOCK_SIZE, + "Parquet: Maximum size of a block to read", + hiveClientConfig.getParquetMaxReadBlockSize(), + false), dataSizeSessionProperty( PARQUET_WRITER_BLOCK_SIZE, "Parquet: Writer block size", @@ -519,6 +525,11 @@ public static boolean isFailOnCorruptedParquetStatistics(ConnectorSession sessio return session.getProperty(PARQUET_FAIL_WITH_CORRUPTED_STATISTICS, Boolean.class); } + public static DataSize getParquetMaxReadBlockSize(ConnectorSession session) + { + return session.getProperty(PARQUET_MAX_READ_BLOCK_SIZE, DataSize.class); + } + public static DataSize getParquetWriterBlockSize(ConnectorSession session) { return session.getProperty(PARQUET_WRITER_BLOCK_SIZE, DataSize.class); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java index e6ce0e793ffc2..bb608b4810ab4 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java @@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.units.DataSize; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -66,6 +67,7 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA; import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH; +import static com.facebook.presto.hive.HiveSessionProperties.getParquetMaxReadBlockSize; import static com.facebook.presto.hive.HiveSessionProperties.isFailOnCorruptedParquetStatistics; import static com.facebook.presto.hive.HiveSessionProperties.isUseParquetColumnNames; import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName; @@ -96,14 +98,6 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; public class ParquetPageSourceFactory implements HiveBatchPageSourceFactory @@ -154,6 +148,7 @@ public Optional createPageSource( columns, isUseParquetColumnNames(session), isFailOnCorruptedParquetStatistics(session), + getParquetMaxReadBlockSize(session), typeManager, effectivePredicate, stats)); @@ -171,6 +166,7 @@ public static ParquetPageSource createParquetPageSource( List columns, boolean useParquetColumnNames, boolean failOnCorruptedParquetStatistics, + DataSize maxReadBlockSize, TypeManager typeManager, TupleDomain effectivePredicate, FileFormatDataSourceStats stats) @@ -219,7 +215,8 @@ public static ParquetPageSource createParquetPageSource( messageColumnIO, blocks.build(), dataSource, - systemMemoryContext); + systemMemoryContext, + maxReadBlockSize); return new ParquetPageSource( parquetReader, diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index e57c96fdb7e72..5af5ba57e5c50 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -86,6 +86,7 @@ public void testDefaults() .setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE)) .setUseParquetColumnNames(false) .setFailOnCorruptedParquetStatistics(true) + .setParquetMaxReadBlockSize(new DataSize(16, Unit.MEGABYTE)) .setUseOrcColumnNames(false) .setAssumeCanonicalPartitionKeys(false) .setOrcBloomFiltersEnabled(false) @@ -180,6 +181,7 @@ public void testExplicitPropertyMappings() .put("hive.text.max-line-length", "13MB") .put("hive.parquet.use-column-names", "true") .put("hive.parquet.fail-on-corrupted-statistics", "false") + .put("hive.parquet.max-read-block-size", "66kB") .put("hive.orc.use-column-names", "true") .put("hive.orc.bloom-filters.enabled", "true") .put("hive.orc.default-bloom-filter-fpp", "0.96") @@ -269,6 +271,7 @@ public void testExplicitPropertyMappings() .setTextMaxLineLength(new DataSize(13, Unit.MEGABYTE)) .setUseParquetColumnNames(true) .setFailOnCorruptedParquetStatistics(false) + .setParquetMaxReadBlockSize(new DataSize(66, Unit.KILOBYTE)) .setUseOrcColumnNames(true) .setAssumeCanonicalPartitionKeys(true) .setOrcBloomFiltersEnabled(true) diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java index 0a24e81918550..41611a3208998 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Range; import com.google.common.primitives.Shorts; +import io.airlift.units.DataSize; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector; @@ -1492,6 +1493,43 @@ private void setParquetLogging() Logger.getLogger("parquet.hadoop.ColumnChunkPageWriteStore").setLevel(Level.WARNING); } + @Test + public void testStructMaxReadBytes() + throws Exception + { + DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE); + List structValues = createTestStructs( + Collections.nCopies(500, String.join("", Collections.nCopies(33, "test"))), + Collections.nCopies(500, String.join("", Collections.nCopies(1, "test")))); + List structFieldNames = asList("a", "b"); + Type structType = RowType.from(asList(field("a", VARCHAR), field("b", VARCHAR))); + + tester.testMaxReadBytes( + getStandardStructObjectInspector(structFieldNames, asList(javaStringObjectInspector, javaStringObjectInspector)), + structValues, + structValues, + structType, + maxReadBlockSize); + } + + @Test + public void testArrayMaxReadBytes() + throws Exception + { + DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE); + Iterable> values = createFixedTestArrays(limit(cycle(asList(1, null, 3, 5, null, null, null, 7, 11, null, 13, 17)), 30_000)); + tester.testMaxReadBytes(getStandardListObjectInspector(javaIntObjectInspector), values, values, new ArrayType(INTEGER), maxReadBlockSize); + } + + @Test + public void testMapMaxReadBytes() + throws Exception + { + DataSize maxReadBlockSize = new DataSize(1_000, DataSize.Unit.BYTE); + Iterable> values = createFixedTestMaps(Collections.nCopies(5_000, String.join("", Collections.nCopies(33, "test"))), longsBetween(0, 5_000)); + tester.testMaxReadBytes(getStandardMapObjectInspector(javaStringObjectInspector, javaLongObjectInspector), values, values, mapType(VARCHAR, BIGINT), maxReadBlockSize); + } + private static Iterable repeatEach(int n, Iterable iterable) { return () -> new AbstractIterator() @@ -1611,6 +1649,47 @@ private Iterable> createNullableTestArrays(Iterable values) return insertNullEvery(ThreadLocalRandom.current().nextInt(2, 5), createTestArrays(values)); } + private List> createFixedTestArrays(Iterable values) + { + List> arrays = new ArrayList<>(); + Iterator valuesIter = values.iterator(); + List array = new ArrayList<>(); + int count = 1; + while (valuesIter.hasNext()) { + if (count % 10 == 0) { + arrays.add(array); + array = new ArrayList<>(); + } + if (count % 20 == 0) { + arrays.add(Collections.emptyList()); + } + array.add(valuesIter.next()); + ++count; + } + return arrays; + } + + private Iterable> createFixedTestMaps(Iterable keys, Iterable values) + { + List> maps = new ArrayList<>(); + Iterator keysIterator = keys.iterator(); + Iterator valuesIterator = values.iterator(); + Map map = new HashMap<>(); + int count = 1; + while (keysIterator.hasNext() && valuesIterator.hasNext()) { + if (count % 5 == 0) { + maps.add(map); + map = new HashMap<>(); + } + if (count % 10 == 0) { + maps.add(Collections.emptyMap()); + } + map.put(keysIterator.next(), valuesIterator.next()); + ++count; + } + return maps; + } + private Iterable> createTestMaps(Iterable keys, Iterable values) { List> maps = new ArrayList<>(); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/ParquetTester.java b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/ParquetTester.java index 92011928657e2..dbdb0a3acf117 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/ParquetTester.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/ParquetTester.java @@ -82,6 +82,7 @@ import java.util.Set; import static com.facebook.presto.hive.AbstractTestHiveFileFormats.getFieldFromCursor; +import static com.facebook.presto.hive.HiveSessionProperties.getParquetMaxReadBlockSize; import static com.facebook.presto.hive.HiveTestUtils.createTestHdfsEnvironment; import static com.facebook.presto.hive.HiveUtil.isArrayType; import static com.facebook.presto.hive.HiveUtil.isMapType; @@ -204,13 +205,27 @@ public void testSingleLevelArrayRoundTrip(ObjectInspector objectInspector, Itera public void testRoundTrip(ObjectInspector objectInspector, Iterable writeValues, Iterable readValues, String columnName, Type type, Optional parquetSchema) throws Exception { - testRoundTrip(singletonList(objectInspector), new Iterable[] {writeValues}, new Iterable[] {readValues}, singletonList(columnName), singletonList(type), parquetSchema, false); + testRoundTrip( + singletonList(objectInspector), + new Iterable[] {writeValues}, + new Iterable[] {readValues}, + singletonList(columnName), + singletonList(type), + parquetSchema, + false); } public void testSingleLevelArrayRoundTrip(ObjectInspector objectInspector, Iterable writeValues, Iterable readValues, String columnName, Type type, Optional parquetSchema) throws Exception { - testRoundTrip(singletonList(objectInspector), new Iterable[] {writeValues}, new Iterable[] {readValues}, singletonList(columnName), singletonList(type), parquetSchema, true); + testRoundTrip( + singletonList(objectInspector), + new Iterable[] {writeValues}, + new Iterable[] {readValues}, + singletonList(columnName), + singletonList(type), + parquetSchema, + true); } public void testRoundTrip(List objectInspectors, Iterable[] writeValues, Iterable[] readValues, List columnNames, List columnTypes, Optional parquetSchema, boolean singleLevelArray) @@ -223,8 +238,14 @@ public void testRoundTrip(List objectInspectors, Iterable[] assertRoundTrip(objectInspectors, transformToNulls(writeValues), transformToNulls(readValues), columnNames, columnTypes, parquetSchema, singleLevelArray); } - private void testRoundTripType(List objectInspectors, Iterable[] writeValues, Iterable[] readValues, - List columnNames, List columnTypes, Optional parquetSchema, boolean singleLevelArray) + private void testRoundTripType( + List objectInspectors, + Iterable[] writeValues, + Iterable[] readValues, + List columnNames, + List columnTypes, + Optional parquetSchema, + boolean singleLevelArray) throws Exception { // forward order @@ -240,7 +261,8 @@ private void testRoundTripType(List objectInspectors, Iterable< assertRoundTrip(objectInspectors, insertNullEvery(5, reverse(writeValues)), insertNullEvery(5, reverse(readValues)), columnNames, columnTypes, parquetSchema, singleLevelArray); } - void assertRoundTrip(List objectInspectors, + void assertRoundTrip( + List objectInspectors, Iterable[] writeValues, Iterable[] readValues, List columnNames, @@ -251,7 +273,8 @@ void assertRoundTrip(List objectInspectors, assertRoundTrip(objectInspectors, writeValues, readValues, columnNames, columnTypes, parquetSchema, false); } - void assertRoundTrip(List objectInspectors, + void assertRoundTrip( + List objectInspectors, Iterable[] writeValues, Iterable[] readValues, List columnNames, @@ -289,6 +312,69 @@ void assertRoundTrip(List objectInspectors, } } + void testMaxReadBytes(ObjectInspector objectInspector, Iterable writeValues, Iterable readValues, Type type, DataSize maxReadBlockSize) + throws Exception + { + assertMaxReadBytes( + singletonList(objectInspector), + new Iterable[] {writeValues}, + new Iterable[] {readValues}, + TEST_COLUMN, + singletonList(type), + Optional.empty(), + maxReadBlockSize); + } + + void assertMaxReadBytes( + List objectInspectors, + Iterable[] writeValues, + Iterable[] readValues, + List columnNames, + List columnTypes, + Optional parquetSchema, + DataSize maxReadBlockSize) + throws Exception + { + WriterVersion version = PARQUET_1_0; + CompressionCodecName compressionCodecName = UNCOMPRESSED; + HiveClientConfig config = new HiveClientConfig() + .setHiveStorageFormat(HiveStorageFormat.PARQUET) + .setUseParquetColumnNames(false) + .setParquetMaxReadBlockSize(maxReadBlockSize); + ConnectorSession session = new TestingConnectorSession(new HiveSessionProperties(config, new OrcFileWriterConfig(), new ParquetFileWriterConfig()).getSessionProperties()); + + try (TempFile tempFile = new TempFile("test", "parquet")) { + JobConf jobConf = new JobConf(); + jobConf.setEnum(COMPRESSION, compressionCodecName); + jobConf.setBoolean(ENABLE_DICTIONARY, true); + jobConf.setEnum(WRITER_VERSION, version); + writeParquetColumn( + jobConf, + tempFile.getFile(), + compressionCodecName, + createTableProperties(columnNames, objectInspectors), + getStandardStructObjectInspector(columnNames, objectInspectors), + getIterators(writeValues), + parquetSchema, + false); + + Iterator[] expectedValues = getIterators(readValues); + try (ConnectorPageSource pageSource = getFileFormat().createFileFormatReader( + session, + HDFS_ENVIRONMENT, + tempFile.getFile(), + columnNames, + columnTypes)) { + assertPageSource( + columnTypes, + expectedValues, + pageSource, + Optional.of(getParquetMaxReadBlockSize(session).toBytes())); + assertFalse(stream(expectedValues).allMatch(Iterator::hasNext)); + } + } + } + private static void assertFileContents( ConnectorSession session, File dataFile, @@ -314,9 +400,18 @@ private static void assertFileContents( } private static void assertPageSource(List types, Iterator[] valuesByField, ConnectorPageSource pageSource) + { + assertPageSource(types, valuesByField, pageSource, Optional.empty()); + } + + private static void assertPageSource(List types, Iterator[] valuesByField, ConnectorPageSource pageSource, Optional maxReadBlockSize) { Page page; while ((page = pageSource.getNextPage()) != null) { + if (maxReadBlockSize.isPresent()) { + assertTrue(page.getPositionCount() == 1 || page.getSizeInBytes() <= maxReadBlockSize.get()); + } + for (int field = 0; field < page.getChannelCount(); field++) { Block block = page.getBlock(field); for (int i = 0; i < block.getPositionCount(); i++) { @@ -420,7 +515,8 @@ private static FileFormat getFileFormat() return OPTIMIZED ? FileFormat.PRESTO_PARQUET : FileFormat.HIVE_PARQUET; } - private static DataSize writeParquetColumn(JobConf jobConf, + private static DataSize writeParquetColumn( + JobConf jobConf, File outputFile, CompressionCodecName compressionCodecName, Properties tableProperties, diff --git a/presto-parquet/pom.xml b/presto-parquet/pom.xml index 89ffff67090e3..d1d36a24afb31 100644 --- a/presto-parquet/pom.xml +++ b/presto-parquet/pom.xml @@ -32,6 +32,11 @@ hive-apache + + io.airlift + units + + io.airlift aircompressor diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java index 6b2ad777635df..9f34ca7c9a23a 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/reader/ParquetReader.java @@ -28,6 +28,7 @@ import com.facebook.presto.spi.type.MapType; import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.TypeSignatureParameter; +import io.airlift.units.DataSize; import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import it.unimi.dsi.fastutil.booleans.BooleanList; import it.unimi.dsi.fastutil.ints.IntArrayList; @@ -51,6 +52,7 @@ import static com.facebook.presto.spi.type.StandardTypes.MAP; import static com.facebook.presto.spi.type.StandardTypes.ROW; import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.Math.max; import static java.lang.Math.min; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; @@ -59,6 +61,8 @@ public class ParquetReader implements Closeable { private static final int MAX_VECTOR_LENGTH = 1024; + private static final int INITIAL_BATCH_SIZE = 1; + private static final int BATCH_SIZE_GROWTH_FACTOR = 2; private final List blocks; private final List columns; @@ -71,21 +75,30 @@ public class ParquetReader private long currentGroupRowCount; private long nextRowInGroup; private int batchSize; + private int nextBatchSize = INITIAL_BATCH_SIZE; private final PrimitiveColumnReader[] columnReaders; + private long[] maxBytesPerCell; + private long maxCombinedBytesPerRow; + private final long maxReadBlockBytes; + private int maxBatchSize = MAX_VECTOR_LENGTH; private AggregatedMemoryContext currentRowGroupMemoryContext; - public ParquetReader(MessageColumnIO messageColumnIO, + public ParquetReader(MessageColumnIO + messageColumnIO, List blocks, ParquetDataSource dataSource, - AggregatedMemoryContext systemMemoryContext) + AggregatedMemoryContext systemMemoryContext, + DataSize maxReadBlockSize) { this.blocks = blocks; this.dataSource = requireNonNull(dataSource, "dataSource is null"); this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null"); this.currentRowGroupMemoryContext = systemMemoryContext.newAggregatedMemoryContext(); + this.maxReadBlockBytes = requireNonNull(maxReadBlockSize, "maxReadBlockSize is null").toBytes(); columns = messageColumnIO.getLeaves(); columnReaders = new PrimitiveColumnReader[columns.size()]; + maxBytesPerCell = new long[columns.size()]; } @Override @@ -107,7 +120,9 @@ public int nextBatch() return -1; } - batchSize = toIntExact(min(MAX_VECTOR_LENGTH, currentGroupRowCount - nextRowInGroup)); + batchSize = toIntExact(min(nextBatchSize, maxBatchSize)); + nextBatchSize = min(batchSize * BATCH_SIZE_GROWTH_FACTOR, MAX_VECTOR_LENGTH); + batchSize = toIntExact(min(batchSize, currentGroupRowCount - nextRowInGroup)); nextRowInGroup += batchSize; currentPosition += batchSize; @@ -194,7 +209,8 @@ private ColumnChunk readPrimitive(PrimitiveField field) throws IOException { ColumnDescriptor columnDescriptor = field.getDescriptor(); - PrimitiveColumnReader columnReader = columnReaders[field.getId()]; + int fieldId = field.getId(); + PrimitiveColumnReader columnReader = columnReaders[fieldId]; if (columnReader.getPageReader() == null) { validateParquet(currentBlockMetadata.getRowCount() > 0, "Row group has 0 rows"); ColumnChunkMetaData metadata = getColumnChunkMetaData(columnDescriptor); @@ -206,7 +222,17 @@ private ColumnChunk readPrimitive(PrimitiveField field) ParquetColumnChunk columnChunk = new ParquetColumnChunk(descriptor, buffer, 0); columnReader.setPageReader(columnChunk.readAllPages()); } - return columnReader.readPrimitive(field); + ColumnChunk columnChunk = columnReader.readPrimitive(field); + + // update max size per primitive column chunk + long bytesPerCell = columnChunk.getBlock().getSizeInBytes() / batchSize; + if (maxBytesPerCell[fieldId] < bytesPerCell) { + // update batch size + maxCombinedBytesPerRow = maxCombinedBytesPerRow - maxBytesPerCell[fieldId] + bytesPerCell; + maxBatchSize = toIntExact(min(maxBatchSize, max(1, maxReadBlockBytes / maxCombinedBytesPerRow))); + maxBytesPerCell[fieldId] = bytesPerCell; + } + return columnChunk; } private byte[] allocateBlock(int length)