Skip to content

Commit 1e0760a

Browse files
authored
PARQUET-1414: Limit page size based on maximum row count (#531)
1 parent e7db9e2 commit 1e0760a

File tree

5 files changed

+128
-5
lines changed

5 files changed

+128
-5
lines changed

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class ParquetProperties {
4848
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
4949
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
5050
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
51+
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
5152

5253
public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
5354

@@ -85,10 +86,11 @@ public static WriterVersion fromString(String name) {
8586
private final ByteBufferAllocator allocator;
8687
private final ValuesWriterFactory valuesWriterFactory;
8788
private final int columnIndexTruncateLength;
89+
private final int pageRowCountLimit;
8890

8991
private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
9092
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
91-
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength) {
93+
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) {
9294
this.pageSizeThreshold = pageSize;
9395
this.initialSlabSize = CapacityByteArrayOutputStream
9496
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -102,6 +104,7 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
102104

103105
this.valuesWriterFactory = writerFactory;
104106
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
107+
this.pageRowCountLimit = pageRowCountLimit;
105108
}
106109

107110
public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -194,6 +197,10 @@ public boolean estimateNextSizeCheck() {
194197
return estimateNextSizeCheck;
195198
}
196199

200+
public int getPageRowCountLimit() {
201+
return pageRowCountLimit;
202+
}
203+
197204
public static Builder builder() {
198205
return new Builder();
199206
}
@@ -213,18 +220,22 @@ public static class Builder {
213220
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
214221
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
215222
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
223+
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
216224

217225
private Builder() {
218226
}
219227

220228
private Builder(ParquetProperties toCopy) {
229+
this.pageSize = toCopy.pageSizeThreshold;
221230
this.enableDict = toCopy.enableDictionary;
222231
this.dictPageSize = toCopy.dictionaryPageSizeThreshold;
223232
this.writerVersion = toCopy.writerVersion;
224233
this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck;
225234
this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck;
226235
this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck;
236+
this.valuesWriterFactory = toCopy.valuesWriterFactory;
227237
this.allocator = toCopy.allocator;
238+
this.pageRowCountLimit = toCopy.pageRowCountLimit;
228239
}
229240

230241
/**
@@ -313,11 +324,17 @@ public Builder withColumnIndexTruncateLength(int length) {
313324
return this;
314325
}
315326

327+
public Builder withPageRowCountLimit(int rowCount) {
328+
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
329+
pageRowCountLimit = rowCount;
330+
return this;
331+
}
332+
316333
public ParquetProperties build() {
317334
ParquetProperties properties =
318335
new ParquetProperties(writerVersion, pageSize, dictPageSize,
319336
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
320-
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength);
337+
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit);
321338
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
322339
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
323340
// we'd like to decouple that and won't need to pass an object to properties and then pass the

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ private interface ColumnWriterProvider {
6767

6868
this.columns = new TreeMap<>();
6969

70-
this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
70+
this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit());
7171

7272
columnWriterProvider = new ColumnWriterProvider() {
7373
@Override
@@ -95,7 +95,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
9595
}
9696
this.columns = unmodifiableMap(mcolumns);
9797

98-
this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
98+
this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit());
9999

100100
columnWriterProvider = new ColumnWriterProvider() {
101101
@Override
@@ -190,13 +190,17 @@ public void endRecord() {
190190

191191
private void sizeCheck() {
192192
long minRecordToWait = Long.MAX_VALUE;
193+
int pageRowCountLimit = props.getPageRowCountLimit();
194+
long rowCountForNextRowCountCheck = rowCount + pageRowCountLimit;
193195
for (ColumnWriterBase writer : columns.values()) {
194196
long usedMem = writer.getCurrentPageBufferedSize();
195197
long rows = rowCount - writer.getRowsWrittenSoFar();
196198
long remainingMem = props.getPageSizeThreshold() - usedMem;
197-
if (remainingMem <= thresholdTolerance) {
199+
if (remainingMem <= thresholdTolerance || rows >= pageRowCountLimit) {
198200
writer.writePage();
199201
remainingMem = props.getPageSizeThreshold();
202+
} else {
203+
rowCountForNextRowCountCheck = min(rowCountForNextRowCountCheck, rowCount + (pageRowCountLimit - rows));
200204
}
201205
long rowsToFillPage =
202206
usedMem == 0 ?
@@ -219,5 +223,10 @@ private void sizeCheck() {
219223
} else {
220224
rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck();
221225
}
226+
227+
// Do the check earlier if required to keep the row count limit
228+
if (rowCountForNextRowCountCheck < rowCountForNextSizeCheck) {
229+
rowCountForNextSizeCheck = rowCountForNextRowCountCheck;
230+
}
222231
}
223232
}

parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,28 @@
1818
*/
1919
package org.apache.parquet.column.mem;
2020

21+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
22+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
2123
import static org.junit.Assert.assertEquals;
24+
import static org.junit.Assert.assertTrue;
2225

2326
import org.apache.parquet.column.ColumnDescriptor;
2427
import org.apache.parquet.column.ColumnReader;
28+
import org.apache.parquet.column.ColumnWriteStore;
2529
import org.apache.parquet.column.ColumnWriter;
2630
import org.apache.parquet.column.ParquetProperties;
2731
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
2832
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
33+
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
34+
import org.apache.parquet.column.page.DataPage;
35+
import org.apache.parquet.column.page.PageReader;
2936
import org.apache.parquet.column.page.mem.MemPageStore;
3037
import org.apache.parquet.example.DummyRecordConverter;
3138
import org.apache.parquet.io.api.Binary;
3239
import org.apache.parquet.schema.MessageType;
3340
import org.apache.parquet.schema.MessageTypeParser;
41+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
42+
import org.apache.parquet.schema.Types;
3443
import org.junit.Test;
3544
import org.slf4j.Logger;
3645
import org.slf4j.LoggerFactory;
@@ -166,6 +175,68 @@ public void testMemColumnSeveralPagesRepeated() throws Exception {
166175
}
167176
}
168177

178+
@Test
179+
public void testPageSize() {
180+
MessageType schema = Types.buildMessage()
181+
.requiredList().requiredElement(BINARY).named("binary_col")
182+
.requiredList().requiredElement(INT32).named("int32_col")
183+
.named("msg");
184+
System.out.println(schema);
185+
MemPageStore memPageStore = new MemPageStore(123);
186+
187+
// Using V2 pages so we have rowCount info
188+
ColumnWriteStore writeStore = new ColumnWriteStoreV2(schema, memPageStore, ParquetProperties.builder()
189+
.withPageSize(1024) // Less than 10 records for binary_col
190+
.withMinRowCountForPageSizeCheck(1) // Enforce having precise page sizing
191+
.withPageRowCountLimit(10)
192+
.withDictionaryEncoding(false) // Enforce having large binary_col pages
193+
.build());
194+
ColumnDescriptor binaryCol = schema.getColumnDescription(new String[] { "binary_col", "list", "element" });
195+
ColumnWriter binaryColWriter = writeStore.getColumnWriter(binaryCol);
196+
ColumnDescriptor int32Col = schema.getColumnDescription(new String[] { "int32_col", "list", "element" });
197+
ColumnWriter int32ColWriter = writeStore.getColumnWriter(int32Col);
198+
// Writing 123 records
199+
for (int i = 0; i < 123; ++i) {
200+
// Writing 10 values per record
201+
for (int j = 0; j < 10; ++j) {
202+
binaryColWriter.write(Binary.fromString("aaaaaaaaaaaa"), j == 0 ? 0 : 2, 2);
203+
int32ColWriter.write(42, j == 0 ? 0 : 2, 2);
204+
}
205+
writeStore.endRecord();
206+
}
207+
writeStore.flush();
208+
209+
// Check that all the binary_col pages are <= 1024 bytes
210+
{
211+
PageReader binaryColPageReader = memPageStore.getPageReader(binaryCol);
212+
assertEquals(1230, binaryColPageReader.getTotalValueCount());
213+
int pageCnt = 0;
214+
int valueCnt = 0;
215+
while (valueCnt < binaryColPageReader.getTotalValueCount()) {
216+
DataPage page = binaryColPageReader.readPage();
217+
++pageCnt;
218+
valueCnt += page.getValueCount();
219+
LOG.info("binary_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get());
220+
assertTrue("Compressed size should be less than 1024", page.getCompressedSize() <= 1024);
221+
}
222+
}
223+
224+
// Check that all the int32_col pages contain <= 10 rows
225+
{
226+
PageReader int32ColPageReader = memPageStore.getPageReader(int32Col);
227+
assertEquals(1230, int32ColPageReader.getTotalValueCount());
228+
int pageCnt = 0;
229+
int valueCnt = 0;
230+
while (valueCnt < int32ColPageReader.getTotalValueCount()) {
231+
DataPage page = int32ColPageReader.readPage();
232+
++pageCnt;
233+
valueCnt += page.getValueCount();
234+
LOG.info("int32_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get());
235+
assertTrue("Row count should be less than 10", page.getIndexRowCount().get() <= 10);
236+
}
237+
}
238+
}
239+
169240
private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
170241
return new ColumnWriteStoreV1(memPageStore,
171242
ParquetProperties.builder()

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public static enum JobSummaryLevel {
144144
public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
145145
public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
146146
public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
147+
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
147148

148149
public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
149150
String level = conf.get(JOB_SUMMARY_LEVEL);
@@ -325,6 +326,18 @@ private static int getColumnIndexTruncateLength(Configuration conf) {
325326
return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
326327
}
327328

329+
public static void setPageRowCountLimit(JobContext jobContext, int rowCount) {
330+
setPageRowCountLimit(getConfiguration(jobContext), rowCount);
331+
}
332+
333+
public static void setPageRowCountLimit(Configuration conf, int rowCount) {
334+
conf.setInt(PAGE_ROW_COUNT_LIMIT, rowCount);
335+
}
336+
337+
private static int getPageRowCountLimit(Configuration conf) {
338+
return conf.getInt(PAGE_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT);
339+
}
340+
328341
private WriteSupport<T> writeSupport;
329342
private ParquetOutputCommitter committer;
330343

@@ -380,6 +393,7 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
380393
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
381394
.withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
382395
.withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
396+
.withPageRowCountLimit(getPageRowCountLimit(conf))
383397
.build();
384398

385399
long blockSize = getLongBlockSize(conf);
@@ -398,6 +412,7 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
398412
LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
399413
LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
400414
LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
415+
LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
401416
}
402417

403418
WriteContext init = writeSupport.init(conf);

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,17 @@ public SELF withPageSize(int pageSize) {
425425
return self();
426426
}
427427

428+
/**
429+
* Sets the Parquet format page row count limit used by the constructed writer.
430+
*
431+
* @param rowCount limit for the number of rows stored in a page
432+
* @return this builder for method chaining
433+
*/
434+
public SELF withPageRowCountLimit(int rowCount) {
435+
encodingPropsBuilder.withPageRowCountLimit(rowCount);
436+
return self();
437+
}
438+
428439
/**
429440
* Set the Parquet format dictionary page size used by the constructed
430441
* writer.

0 commit comments

Comments
 (0)