From 1df4a71d5a8f6e7c0adae142ce16bfccd34de999 Mon Sep 17 00:00:00 2001 From: julien Date: Thu, 8 Jan 2015 11:49:46 -0800 Subject: [PATCH] refactor CapacityByteArray to be aware of page size --- .../parquet/column/ParquetProperties.java | 38 +++--- .../column/impl/ColumnWriteStoreV2.java | 2 +- .../parquet/column/impl/ColumnWriterV1.java | 6 +- .../parquet/column/impl/ColumnWriterV2.java | 9 +- .../bitpacking/BitPackingValuesWriter.java | 5 +- .../column/values/boundedint/BitWriter.java | 4 +- .../boundedint/BoundedIntValuesFactory.java | 4 +- .../boundedint/BoundedIntValuesWriter.java | 4 +- .../delta/DeltaBinaryPackingValuesWriter.java | 14 +-- .../DeltaLengthByteArrayValuesWriter.java | 10 +- .../deltastrings/DeltaByteArrayWriter.java | 8 +- .../dictionary/DictionaryValuesWriter.java | 14 +-- .../FixedLenByteArrayPlainValuesWriter.java | 14 +-- .../values/plain/PlainValuesWriter.java | 4 +- .../rle/RunLengthBitPackingHybridEncoder.java | 4 +- ...RunLengthBitPackingHybridValuesWriter.java | 4 +- .../column/impl/TestColumnReaderImpl.java | 4 +- .../bitpacking/TestBitPackingColumn.java | 2 +- .../values/boundedint/TestBoundedColumns.java | 4 +- .../DeltaBinaryPackingValuesWriterTest.java | 4 +- .../benchmark/BenchmarkIntegerOutputSize.java | 4 +- .../BenchmarkReadingRandomIntegers.java | 4 +- .../benchmark/RandomWritingBenchmarkTest.java | 6 +- .../SmallRangeWritingBenchmarkTest.java | 2 +- .../TestDeltaLengthByteArray.java | 8 +- .../BenchmarkDeltaLengthByteArray.java | 16 +-- .../deltastrings/TestDeltaByteArray.java | 12 +- .../benchmark/BenchmarkDeltaByteArray.java | 28 ++--- .../values/dictionary/TestDictionary.java | 2 +- ...LengthBitPackingHybridIntegrationTest.java | 2 +- .../TestRunLengthBitPackingHybridEncoder.java | 22 ++-- .../bytes/CapacityByteArrayOutputStream.java | 113 +++++++----------- .../TestCapacityByteArrayOutputStream.java | 26 ++-- .../hadoop/ColumnChunkPageWriteStore.java | 11 +- .../hadoop/InternalParquetRecordWriter.java | 2 +- .../hadoop/TestColumnChunkPageWriteStore.java | 2 +- 36 files changed, 202 insertions(+), 216 deletions(-) diff --git a/parquet-column/src/main/java/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/parquet/column/ParquetProperties.java index dc7774f27b..c083867c09 100644 --- a/parquet-column/src/main/java/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/parquet/column/ParquetProperties.java @@ -63,29 +63,29 @@ public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean this.enableDictionary = enableDict; } - public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol) { + public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) { if (maxLevel == 0) { return new DevNullValuesWriter(); } else { return new RunLengthBitPackingHybridValuesWriter( - getWidthFromMaxInt(maxLevel), initialSizePerCol); + getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize); } } - private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol) { + private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) { switch (path.getType()) { case BOOLEAN: return new BooleanPlainValuesWriter(); case INT96: - return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol); + return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize); case FIXED_LEN_BYTE_ARRAY: - return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol); + return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize); case BINARY: case INT32: case INT64: case DOUBLE: case FLOAT: - return new PlainValuesWriter(initialSizePerCol); + return new PlainValuesWriter(initialSizePerCol, pageSize); default: throw new IllegalArgumentException("Unknown type " + path.getType()); } @@ -128,24 +128,24 @@ private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initi } } - private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol) { + private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol, int pageSize) { switch(writerVersion) { case PARQUET_1_0: - return plainWriter(path, initialSizePerCol); + return plainWriter(path, initialSizePerCol, pageSize); case PARQUET_2_0: switch (path.getType()) { case BOOLEAN: - return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol); + return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize); case BINARY: case FIXED_LEN_BYTE_ARRAY: - return new DeltaByteArrayWriter(initialSizePerCol); + return new DeltaByteArrayWriter(initialSizePerCol, pageSize); case INT32: - return new DeltaBinaryPackingValuesWriter(initialSizePerCol); + return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize); case INT96: case INT64: case DOUBLE: case FLOAT: - return plainWriter(path, initialSizePerCol); + return plainWriter(path, initialSizePerCol, pageSize); default: throw new IllegalArgumentException("Unknown type " + path.getType()); } @@ -154,8 +154,8 @@ private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePe } } - private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol) { - ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol); + private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol, int pageSize) { + ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol, pageSize); if (enableDictionary) { return FallbackValuesWriter.of( dictionaryWriter(path, initialSizePerCol), @@ -165,16 +165,16 @@ private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSi } } - public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol) { + public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) { switch (path.getType()) { case BOOLEAN: // no dictionary encoding for boolean - return writerToFallbackTo(path, initialSizePerCol); + return writerToFallbackTo(path, initialSizePerCol, pageSize); case FIXED_LEN_BYTE_ARRAY: // dictionary encoding for that type was not enabled in PARQUET 1.0 if (writerVersion == WriterVersion.PARQUET_2_0) { - return dictWriterWithFallBack(path, initialSizePerCol); + return dictWriterWithFallBack(path, initialSizePerCol, pageSize); } else { - return writerToFallbackTo(path, initialSizePerCol); + return writerToFallbackTo(path, initialSizePerCol, pageSize); } case BINARY: case INT32: @@ -182,7 +182,7 @@ public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol case INT96: case DOUBLE: case FLOAT: - return dictWriterWithFallBack(path, initialSizePerCol); + return dictWriterWithFallBack(path, initialSizePerCol, pageSize); default: throw new IllegalArgumentException("Unknown type " + path.getType()); } diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java index ba6edf3d12..03a219d13e 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java @@ -61,7 +61,7 @@ public ColumnWriteStoreV2( Map mcolumns = new TreeMap(); for (ColumnDescriptor path : schema.getColumns()) { PageWriter pageWriter = pageWriteStore.getPageWriter(path); - mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps)); + mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps, pageSizeThreshold)); } this.columns = unmodifiableMap(mcolumns); this.writers = this.columns.values(); diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java index 8b72207da6..ac3fc19e3c 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java @@ -69,9 +69,9 @@ public ColumnWriterV1( resetStatistics(); ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary); - this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol); - this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol); - this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol); + this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol, pageSizeThreshold); + this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol, pageSizeThreshold); + this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSizeThreshold); } private void log(Object value, int r, int d) { diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java index 65f0366727..100bca28ae 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java @@ -58,13 +58,14 @@ public ColumnWriterV2( ColumnDescriptor path, PageWriter pageWriter, int initialSizePerCol, - ParquetProperties parquetProps) { + ParquetProperties parquetProps, + int pageSize) { this.path = path; this.pageWriter = pageWriter; resetStatistics(); - this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol); - this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol); - this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol); + this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol, pageSize); + this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol, pageSize); + this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSize); } private void log(Object value, int r, int d) { diff --git a/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesWriter.java index 52e651462d..18813bb7c3 100644 --- a/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesWriter.java @@ -42,10 +42,11 @@ public class BitPackingValuesWriter extends ValuesWriter { /** * @param bound the maximum value stored by this column + * @param pageSize */ - public BitPackingValuesWriter(int bound, int initialCapacity) { + public BitPackingValuesWriter(int bound, int initialCapacity, int pageSize) { this.bitsPerValue = getWidthFromMaxInt(bound); - this.out = new CapacityByteArrayOutputStream(initialCapacity); + this.out = new CapacityByteArrayOutputStream(initialCapacity, pageSize); init(); } diff --git a/parquet-column/src/main/java/parquet/column/values/boundedint/BitWriter.java b/parquet-column/src/main/java/parquet/column/values/boundedint/BitWriter.java index cce73d4174..7219fbd52a 100644 --- a/parquet-column/src/main/java/parquet/column/values/boundedint/BitWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/boundedint/BitWriter.java @@ -38,8 +38,8 @@ class BitWriter { } } - public BitWriter(int initialCapacity) { - this.baos = new CapacityByteArrayOutputStream(initialCapacity); + public BitWriter(int initialCapacity, int pageSize) { + this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize); } public void writeBit(boolean bit) { diff --git a/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesFactory.java b/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesFactory.java index a60c481c4f..f7db89b337 100644 --- a/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesFactory.java +++ b/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesFactory.java @@ -23,7 +23,7 @@ public static ValuesReader getBoundedReader(int bound) { return bound == 0 ? new ZeroIntegerValuesReader() : new BoundedIntValuesReader(bound); } - public static ValuesWriter getBoundedWriter(int bound, int initialCapacity) { - return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity); + public static ValuesWriter getBoundedWriter(int bound, int initialCapacity, int pageSize) { + return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity, pageSize); } } diff --git a/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesWriter.java index 7f4ac6218e..576c6ed0bb 100644 --- a/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesWriter.java @@ -56,11 +56,11 @@ class BoundedIntValuesWriter extends ValuesWriter { } } - public BoundedIntValuesWriter(int bound, int initialCapacity) { + public BoundedIntValuesWriter(int bound, int initialCapacity, int pageSize) { if (bound == 0) { throw new ParquetEncodingException("Value bound cannot be 0. Use DevNullColumnWriter instead."); } - this.bitWriter = new BitWriter(initialCapacity); + this.bitWriter = new BitWriter(initialCapacity, pageSize); bitsPerValue = (int)Math.ceil(Math.log(bound + 1)/Math.log(2)); shouldRepeatThreshold = (bitsPerValue + 9)/(1 + bitsPerValue); if (Log.DEBUG) LOG.debug("init column with bit width of " + bitsPerValue + " and repeat threshold of " + shouldRepeatThreshold); diff --git a/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java index e98cf6749b..ffe6956fdd 100644 --- a/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java @@ -52,9 +52,9 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter { * reused between flushes. */ public static final int MAX_BITWIDTH = 32; - + public static final int DEFAULT_NUM_BLOCK_VALUES = 128; - + public static final int DEFAULT_NUM_MINIBLOCKS = 4; private final CapacityByteArrayOutputStream baos; @@ -107,17 +107,17 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter { * it will be reset after each flush */ private int minDeltaInCurrentBlock = Integer.MAX_VALUE; - - public DeltaBinaryPackingValuesWriter(int slabSize) { - this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize); + + public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize) { + this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize); } - public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize) { + public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize) { this.config = new DeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum); bitWidths = new int[config.miniBlockNumInABlock]; deltaBlockBuffer = new int[blockSizeInValues]; miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH]; - baos = new CapacityByteArrayOutputStream(slabSize); + baos = new CapacityByteArrayOutputStream(slabSize, pageSize); } @Override diff --git a/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java index 6564a1b42d..5b95a19a4a 100644 --- a/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java @@ -32,7 +32,7 @@ *
  *   {@code
  *   delta-length-byte-array : length* byte-array*
- *   } 
+ *   }
  * 
* @author Aniket Mokashi * @@ -45,13 +45,13 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter { private CapacityByteArrayOutputStream arrayOut; private LittleEndianDataOutputStream out; - public DeltaLengthByteArrayValuesWriter(int initialSize) { - arrayOut = new CapacityByteArrayOutputStream(initialSize); + public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize) { + arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize); out = new LittleEndianDataOutputStream(arrayOut); lengthWriter = new DeltaBinaryPackingValuesWriter( DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES, DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS, - initialSize); + initialSize, pageSize); } @Override @@ -98,6 +98,6 @@ public long getAllocatedSize() { @Override public String memUsageString(String prefix) { - return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY"); + return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY"); } } diff --git a/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayWriter.java b/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayWriter.java index 8bfb30d80b..c6ebacb849 100644 --- a/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayWriter.java @@ -27,7 +27,7 @@ *
  *   {@code
  *   delta-length-byte-array : prefix-length* suffixes*
- *   } 
+ *   }
  * 
* @author Aniket Mokashi * @@ -38,9 +38,9 @@ public class DeltaByteArrayWriter extends ValuesWriter{ private ValuesWriter suffixWriter; private byte[] previous; - public DeltaByteArrayWriter(int initialCapacity) { - this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity); - this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity); + public DeltaByteArrayWriter(int initialCapacity, int pageSize) { + this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity, pageSize); + this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity, pageSize); this.previous = new byte[0]; } diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java index 624aa30741..9488a4c8ce 100644 --- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java @@ -143,7 +143,7 @@ public BytesInput getBytes() { if (DEBUG) LOG.debug("max dic id " + maxDicId); int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId); // TODO: what is a good initialCapacity? - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024, maxDictionaryByteSize); IntIterator iterator = encodedValues.iterator(); try { while (iterator.hasNext()) { @@ -237,7 +237,7 @@ public void writeBytes(Binary v) { public DictionaryPage createDictionaryPage() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); Iterator binaryIterator = binaryDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -312,7 +312,7 @@ public void writeBytes(Binary value) { public DictionaryPage createDictionaryPage() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize); + FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize, maxDictionaryByteSize); Iterator binaryIterator = binaryDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -357,7 +357,7 @@ public void writeLong(long v) { public DictionaryPage createDictionaryPage() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); LongIterator longIterator = longDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -429,7 +429,7 @@ public void writeDouble(double v) { public DictionaryPage createDictionaryPage() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); DoubleIterator doubleIterator = doubleDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -501,7 +501,7 @@ public void writeInteger(int v) { public DictionaryPage createDictionaryPage() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); it.unimi.dsi.fastutil.ints.IntIterator intIterator = intDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -573,7 +573,7 @@ public void writeFloat(float v) { public DictionaryPage createDictionaryPage() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); FloatIterator floatIterator = floatDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { diff --git a/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java index 4047f897f8..a502aa24d7 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java @@ -33,14 +33,14 @@ */ public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter { private static final Log LOG = Log.getLog(PlainValuesWriter.class); - + private CapacityByteArrayOutputStream arrayOut; private LittleEndianDataOutputStream out; private int length; - - public FixedLenByteArrayPlainValuesWriter(int length, int initialSize) { + + public FixedLenByteArrayPlainValuesWriter(int length, int initialSize, int pageSize) { this.length = length; - this.arrayOut = new CapacityByteArrayOutputStream(initialSize); + this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize); this.out = new LittleEndianDataOutputStream(arrayOut); } @@ -56,7 +56,7 @@ public final void writeBytes(Binary v) { throw new ParquetEncodingException("could not write fixed bytes", e); } } - + @Override public long getBufferedSize() { return arrayOut.size(); @@ -72,7 +72,7 @@ public BytesInput getBytes() { if (Log.DEBUG) LOG.debug("writing a buffer of size " + arrayOut.size()); return BytesInput.from(arrayOut); } - + @Override public void reset() { arrayOut.reset(); @@ -82,7 +82,7 @@ public void reset() { public long getAllocatedSize() { return arrayOut.getCapacity(); } - + @Override public Encoding getEncoding() { return Encoding.PLAIN; diff --git a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesWriter.java index bc29f8bffe..a85cc7e901 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesWriter.java @@ -41,8 +41,8 @@ public class PlainValuesWriter extends ValuesWriter { private CapacityByteArrayOutputStream arrayOut; private LittleEndianDataOutputStream out; - public PlainValuesWriter(int initialSize) { - arrayOut = new CapacityByteArrayOutputStream(initialSize); + public PlainValuesWriter(int initialSize, int pageSize) { + arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize); out = new LittleEndianDataOutputStream(arrayOut); } diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java index e8b4a8aca4..da1ee08364 100644 --- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java @@ -113,7 +113,7 @@ public class RunLengthBitPackingHybridEncoder { private boolean toBytesCalled; - public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity) { + public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity, int pageSize) { if (DEBUG) { LOG.debug(String.format("Encoding: RunLengthBitPackingHybridEncoder with " + "bithWidth: %d initialCapacity %d", bitWidth, initialCapacity)); @@ -122,7 +122,7 @@ public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity) { Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); this.bitWidth = bitWidth; - this.baos = new CapacityByteArrayOutputStream(initialCapacity); + this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize); this.packBuffer = new byte[bitWidth]; this.bufferedValues = new int[8]; this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java index ed0ac97b29..c50da1888b 100644 --- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java @@ -29,8 +29,8 @@ public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter { private final RunLengthBitPackingHybridEncoder encoder; - public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity) { - this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity); + public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity, int pageSize) { + this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, pageSize); } @Override diff --git a/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java index 325bf431f2..bcff4761e2 100644 --- a/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java +++ b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java @@ -38,7 +38,7 @@ public void test() { MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }"); ColumnDescriptor col = schema.getColumns().get(0); MemPageWriter pageWriter = new MemPageWriter(); - ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true)); + ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true), 2048); for (int i = 0; i < rows; i++) { columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0); if ((i + 1) % 1000 == 0) { @@ -73,7 +73,7 @@ public void testOptional() { MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }"); ColumnDescriptor col = schema.getColumns().get(0); MemPageWriter pageWriter = new MemPageWriter(); - ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true)); + ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true), 2048); for (int i = 0; i < rows; i++) { columnWriterV2.writeNull(0, 0); if ((i + 1) % 1000 == 0) { diff --git a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java index 0351db876b..490982ee32 100644 --- a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java +++ b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java @@ -185,7 +185,7 @@ public ValuesReader getReader(final int bound) { return new BitPackingValuesReader(bound); } public ValuesWriter getWriter(final int bound) { - return new BitPackingValuesWriter(bound, 32*1024); + return new BitPackingValuesWriter(bound, 32*1024, 64*1024); } } , diff --git a/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java b/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java index b8ee5fbac7..a7e3c3618a 100644 --- a/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java +++ b/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java @@ -54,7 +54,7 @@ public void testWriterNoRepeat() throws IOException { } private void compareOutput(int bound, int[] ints, String[] result) throws IOException { - BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64*1024); + BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64*1024, 64*1024); for (int i : ints) { bicw.writeInteger(i); } @@ -123,7 +123,7 @@ public void testSerDe() throws Exception { ByteArrayOutputStream tmp = new ByteArrayOutputStream(); int[] stream = new int[totalValuesInStream]; - BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64 * 1024); + BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64 * 1024, 64*1024); int idx = 0; for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) { int next = 0; diff --git a/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java b/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java index 415f5097ff..aac7cf22bd 100644 --- a/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java +++ b/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java @@ -39,13 +39,13 @@ public class DeltaBinaryPackingValuesWriterTest { public void setUp() { blockSize = 128; miniBlockNum = 4; - writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100); + writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 200); random = new Random(); } @Test(expected = IllegalArgumentException.class) public void miniBlockSizeShouldBeMultipleOf8() { - new DeltaBinaryPackingValuesWriter(1281, 4, 100); + new DeltaBinaryPackingValuesWriter(1281, 4, 100, 100); } /* When data size is multiple of Block*/ diff --git a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java index 08eaf397fd..645759202b 100644 --- a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java +++ b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java @@ -74,8 +74,8 @@ public int getIntValue() { } public void testRandomIntegers(IntFunc func,int bitWidth) { - DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum,100); - RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth,100); + DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum, 100, 20000); + RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth, 100, 20000); for (int i = 0; i < dataSize; i++) { int v = func.getIntValue(); delta.writeInteger(v); diff --git a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java index d01a605dc4..6e42dadad5 100644 --- a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java +++ b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java @@ -51,8 +51,8 @@ public static void prepare() throws IOException { data[i] = random.nextInt(100) - 200; } - ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100); - ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100); + ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000); + ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000); for (int i = 0; i < data.length; i++) { delta.writeInteger(data[i]); diff --git a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java index 0e4ec0b56c..77f5bcdc75 100644 --- a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java +++ b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java @@ -47,21 +47,21 @@ public static void prepare() { @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2) @Test public void writeDeltaPackingTest(){ - DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100); + DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000); runWriteTest(writer); } @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2) @Test public void writeRLETest(){ - ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(32,100); + ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000); runWriteTest(writer); } @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2) @Test public void writeDeltaPackingTest2(){ - DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100); + DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000); runWriteTest(writer); } } diff --git a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java index f5ef85e228..bf7b030616 100644 --- a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java +++ b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java @@ -39,7 +39,7 @@ public static void prepare() { @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2) @Test public void writeRLEWithSmallBitWidthTest(){ - ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(2,100); + ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(2, 100, 20000); runWriteTest(writer); } } diff --git a/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java index 8cb39b1a30..9ceb6f8787 100644 --- a/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java +++ b/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java @@ -31,7 +31,7 @@ public class TestDeltaLengthByteArray { @Test public void testSerialization () throws IOException { - DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64*1024); + DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024); DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); Utils.writeData(writer, values); @@ -41,10 +41,10 @@ public void testSerialization () throws IOException { Assert.assertEquals(Binary.fromString(values[i]), bin[i]); } } - + @Test public void testRandomStrings() throws IOException { - DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64*1024); + DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024); DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); String[] values = Utils.getRandomStringSamples(1000, 32); @@ -58,7 +58,7 @@ public void testRandomStrings() throws IOException { @Test public void testLengths() throws IOException { - DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64*1024); + DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024); ValuesReader reader = new DeltaBinaryPackingValuesReader(); Utils.writeData(writer, values); diff --git a/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java b/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java index 7261f5146a..16ff624db1 100644 --- a/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java +++ b/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java @@ -35,30 +35,30 @@ @AxisRange(min = 0, max = 1) @BenchmarkMethodChart(filePrefix = "benchmark-encoding-writing-random") public class BenchmarkDeltaLengthByteArray { - + @Rule public org.junit.rules.TestRule benchmarkRun = new BenchmarkRule(); - + String[] values = Utils.getRandomStringSamples(1000000, 32); - + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException { - PlainValuesWriter writer = new PlainValuesWriter(64*1024); + PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024); BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); - + Utils.writeData(writer, values); byte [] data = writer.getBytes().toByteArray(); Binary[] bin = Utils.readData(reader, data, values.length); System.out.println("size " + data.length); } - + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws IOException { - DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64*1024); + DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024); DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); - + Utils.writeData(writer, values); byte [] data = writer.getBytes().toByteArray(); Binary[] bin = Utils.readData(reader, data, values.length); diff --git a/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java b/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java index c784491bbf..fb52d52b0c 100644 --- a/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java +++ b/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java @@ -26,13 +26,13 @@ import parquet.io.api.Binary; public class TestDeltaByteArray { - + static String[] values = {"parquet-mr", "parquet", "parquet-format"}; static String[] randvalues = Utils.getRandomStringSamples(10000, 32); @Test public void testSerialization () throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64*1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); DeltaByteArrayReader reader = new DeltaByteArrayReader(); Utils.writeData(writer, values); @@ -42,10 +42,10 @@ public void testSerialization () throws IOException { Assert.assertEquals(Binary.fromString(values[i]), bin[i]); } } - + @Test public void testRandomStrings() throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64*1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); DeltaByteArrayReader reader = new DeltaByteArrayReader(); Utils.writeData(writer, randvalues); @@ -58,7 +58,7 @@ public void testRandomStrings() throws IOException { @Test public void testLengths() throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64*1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); ValuesReader reader = new DeltaBinaryPackingValuesReader(); Utils.writeData(writer, values); @@ -69,7 +69,7 @@ public void testLengths() throws IOException { Assert.assertEquals(0, bin[0]); Assert.assertEquals(7, bin[1]); Assert.assertEquals(7, bin[2]); - + int offset = reader.getNextOffset(); reader = new DeltaBinaryPackingValuesReader(); bin = Utils.readInts(reader, writer.getBytes().toByteArray(), offset, values.length); diff --git a/parquet-column/src/test/java/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java b/parquet-column/src/test/java/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java index f2977eac68..4eea154943 100644 --- a/parquet-column/src/test/java/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java +++ b/parquet-column/src/test/java/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java @@ -36,10 +36,10 @@ @AxisRange(min = 0, max = 1) @BenchmarkMethodChart(filePrefix = "benchmark-encoding-writing-random") public class BenchmarkDeltaByteArray { - + @Rule public org.junit.rules.TestRule benchmarkRun = new BenchmarkRule(); - + static String[] values = Utils.getRandomStringSamples(1000000, 32); static String[] sortedVals; static @@ -47,49 +47,49 @@ public class BenchmarkDeltaByteArray { sortedVals = Arrays.copyOf(values, values.length); Arrays.sort(sortedVals); } - + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException { - PlainValuesWriter writer = new PlainValuesWriter(64*1024); + PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024); BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); - + Utils.writeData(writer, values); byte [] data = writer.getBytes().toByteArray(); Binary[] bin = Utils.readData(reader, data, values.length); System.out.println("size " + data.length); } - + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64*1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); DeltaByteArrayReader reader = new DeltaByteArrayReader(); - + Utils.writeData(writer, values); byte [] data = writer.getBytes().toByteArray(); Binary[] bin = Utils.readData(reader, data, values.length); System.out.println("size " + data.length); } - + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkSortedStringsWithPlainValuesWriter() throws IOException { - PlainValuesWriter writer = new PlainValuesWriter(64*1024); + PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024); BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); - + Utils.writeData(writer, sortedVals); byte [] data = writer.getBytes().toByteArray(); Binary[] bin = Utils.readData(reader, data, values.length); System.out.println("size " + data.length); } - + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkSortedStringsWithDeltaLengthByteArrayValuesWriter() throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64*1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); DeltaByteArrayReader reader = new DeltaByteArrayReader(); - + Utils.writeData(writer, sortedVals); byte [] data = writer.getBytes().toByteArray(); Binary[] bin = Utils.readData(reader, data, values.length); diff --git a/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java index 5f2894ee9a..9a937de41f 100644 --- a/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java +++ b/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java @@ -50,7 +50,7 @@ public class TestDictionary { private FallbackValuesWriter plainFallBack(I dvw, int initialSize) { - return FallbackValuesWriter.of(dvw, new PlainValuesWriter(initialSize)); + return FallbackValuesWriter.of(dvw, new PlainValuesWriter(initialSize, initialSize * 5)); } private FallbackValuesWriter newPlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) { diff --git a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java index 2359d8de72..da11411d73 100644 --- a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java +++ b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java @@ -37,7 +37,7 @@ public void integrationTest() throws Exception { private void doIntegrationTest(int bitWidth) throws Exception { long modValue = 1L << bitWidth; - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 1000); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 1000, 64000); int numValues = 0; for (int i = 0; i < 100; i++) { diff --git a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java index 0859cb10d1..f68f8401d5 100644 --- a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java @@ -32,10 +32,10 @@ * @author Alex Levenson */ public class TestRunLengthBitPackingHybridEncoder { - + @Test public void testRLEOnly() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); for (int i = 0; i < 100; i++) { encoder.writeInt(4); } @@ -65,7 +65,7 @@ public void testRepeatedZeros() throws Exception { // make sure that repeated 0s at the beginning // of the stream don't trip up the repeat count - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); for (int i = 0; i < 10; i++) { encoder.writeInt(0); } @@ -83,7 +83,7 @@ public void testRepeatedZeros() throws Exception { @Test public void testBitWidthZero() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(0, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(0, 5, 10); for (int i = 0; i < 10; i++) { encoder.writeInt(0); } @@ -99,7 +99,7 @@ public void testBitWidthZero() throws Exception { @Test public void testBitPackingOnly() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); for (int i = 0; i < 100; i++) { encoder.writeInt(i % 3); @@ -122,7 +122,7 @@ public void testBitPackingOnly() throws Exception { @Test public void testBitPackingOverflow() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); for (int i = 0; i < 1000; i++) { encoder.writeInt(i % 3); @@ -154,7 +154,7 @@ public void testBitPackingOverflow() throws Exception { @Test public void testTransitionFromBitPackingToRle() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); // 5 obviously bit-packed values encoder.writeInt(0); @@ -192,7 +192,7 @@ public void testTransitionFromBitPackingToRle() throws Exception { @Test public void testPaddingZerosOnUnfinishedBitPackedRuns() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(5, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(5, 5, 10); for (int i = 0; i < 9; i++) { encoder.writeInt(i+1); } @@ -211,7 +211,7 @@ public void testPaddingZerosOnUnfinishedBitPackedRuns() throws Exception { @Test public void testSwitchingModes() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(9, 100); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(9, 100, 1000); // rle first for (int i = 0; i < 25; i++) { @@ -275,14 +275,14 @@ public void testSwitchingModes() throws Exception { // end of stream assertEquals(-1, is.read()); } - + @Test public void testGroupBoundary() throws Exception { byte[] bytes = new byte[2]; // Create an RLE byte stream that has 3 values (1 literal group) with // bit width 2. - bytes[0] = (1 << 1 )| 1; + bytes[0] = (1 << 1 )| 1; bytes[1] = (1 << 0) | (2 << 2) | (3 << 4); ByteArrayInputStream stream = new ByteArrayInputStream(bytes); RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, stream); diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index 5c96fb8dd7..3efe9d0e78 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -18,6 +18,7 @@ import static java.lang.Math.max; import static java.lang.String.format; import static java.lang.System.arraycopy; +import static parquet.Preconditions.checkArgument; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -26,7 +27,6 @@ import java.util.List; import parquet.Log; -import parquet.Preconditions; /** * functionality of ByteArrayOutputStream without the memory and copy overhead @@ -42,29 +42,41 @@ public class CapacityByteArrayOutputStream extends OutputStream { private static final Log LOG = Log.getLog(CapacityByteArrayOutputStream.class); - private static final int MINIMUM_SLAB_SIZE = 64 * 1024; - private static final int EXPONENTIAL_SLAB_SIZE_THRESHOLD = 10; private static final byte[] EMPTY_SLAB = new byte[0]; - private int slabSize; + private int initialSize; + private final int pageSize; private List slabs = new ArrayList(); private byte[] currentSlab; - private int capacity; + private int capacity = 0; private int currentSlabIndex; private int currentSlabPosition; private int size; /** - * @param initialSize the initialSize of the buffer (also slab size) + * defaults pageSize to 1MB + * @param initialSize + * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int)} */ + @Deprecated public CapacityByteArrayOutputStream(int initialSize) { - Preconditions.checkArgument(initialSize > 0, "initialSize must be > 0"); + this(initialSize, 1024 * 1024); + } + + /** + * @param initialSize the initialSize of the buffer (also slab size) + * @param pageSize + */ + public CapacityByteArrayOutputStream(int initialSize, int pageSize) { + checkArgument(initialSize > 0, "initialSize must be > 0"); + checkArgument(pageSize > 0, "pageSize must be > 0"); + this.pageSize = pageSize; initSlabs(initialSize); } private void initSlabs(int initialSize) { if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSize)); - this.slabSize = initialSize; + this.initialSize = initialSize; this.slabs.clear(); this.capacity = 0; this.currentSlab = EMPTY_SLAB; @@ -73,36 +85,30 @@ private void initSlabs(int initialSize) { this.size = 0; } + /** + * the new slab is guaranteed to be at least minimumSize + * @param minimumSize the size of the data we want to copy in the new slab + */ private void addSlab(int minimumSize) { - minimumSize = max(minimumSize, MINIMUM_SLAB_SIZE); this.currentSlabIndex += 1; - if (currentSlabIndex < this.slabs.size()) { - // reuse existing slab - this.currentSlab = this.slabs.get(currentSlabIndex); - if (Log.DEBUG) LOG.debug(format("reusing slab of size %d", currentSlab.length)); - if (currentSlab.length < minimumSize) { - if (Log.DEBUG) LOG.debug(format("slab size %,d too small for value of size %,d. replacing slab", currentSlab.length, minimumSize)); - byte[] newSlab = new byte[minimumSize]; - capacity += newSlab.length - currentSlab.length; - this.currentSlab = newSlab; - this.slabs.set(currentSlabIndex, newSlab); - } + int nextSlabSize; + if (size == 0) { + nextSlabSize = initialSize; + } else if (size > pageSize / 5) { + // to avoid an overhead of up to twice the needed size, we get linear when approaching target page size + nextSlabSize = pageSize / 5; } else { - if (currentSlabIndex > EXPONENTIAL_SLAB_SIZE_THRESHOLD) { - // make slabs bigger in case we are creating too many of them - // double slab size every time. - this.slabSize = size; - if (Log.DEBUG) LOG.debug(format("used %d slabs, new slab size %d", currentSlabIndex, slabSize)); - } - if (slabSize < minimumSize) { - if (Log.DEBUG) LOG.debug(format("slab size %,d too small for value of size %,d. Bumping up slab size", slabSize, minimumSize)); - this.slabSize = minimumSize; - } - if (Log.DEBUG) LOG.debug(format("new slab of size %d", slabSize)); - this.currentSlab = new byte[slabSize]; - this.slabs.add(currentSlab); - this.capacity += slabSize; + // double the size every time + nextSlabSize = size; + } + if (nextSlabSize < minimumSize) { + if (Log.DEBUG) LOG.debug(format("slab size %,d too small for value of size %,d. Bumping up slab size", nextSlabSize, minimumSize)); + nextSlabSize = minimumSize; } + if (Log.DEBUG) LOG.debug(format("used %d slabs, new slab size %d", currentSlabIndex, nextSlabSize)); + this.currentSlab = new byte[nextSlabSize]; + this.slabs.add(currentSlab); + this.capacity += nextSlabSize; this.currentSlabPosition = 0; } @@ -161,38 +167,12 @@ public int getCapacity() { /** * When re-using an instance with reset, it will adjust slab size based on previous data size. * The intent is to reuse the same instance for the same type of data (for example, the same column). - * The assumption is that the size in the buffer will be consistent. Otherwise we fall back to exponentialy double the slab size. - *
    - *
  • if we used less than half of the first slab (and it is above the minimum slab size), we will make the slab size smaller. - *
  • if we used more than the slab count threshold (10), we will re-adjust the slab size. - *
- * if re-adjusting the slab size we will make it 1/5th of the previous used size in the buffer so that overhead of extra memory allocation is about 20% - * If we used less than the available slabs we free up the unused ones to reduce memory overhead. + * The assumption is that the size in the buffer will be consistent. */ public void reset() { - // heuristics to adjust slab size - if ( - // if we have only one slab, make sure it is not way too big (more than twice what we need). Except if the slab is already small - (currentSlabIndex == 0 && currentSlabPosition < currentSlab.length / 2 && currentSlab.length > MINIMUM_SLAB_SIZE) - || - // we want to avoid generating too many slabs. - (currentSlabIndex > EXPONENTIAL_SLAB_SIZE_THRESHOLD) - ){ - // readjust slab size - initSlabs(max(size / 5, MINIMUM_SLAB_SIZE)); // should make overhead to about 20% without incurring many slabs - if (Log.DEBUG) LOG.debug(format("used %d slabs, new slab size %d", currentSlabIndex + 1, slabSize)); - } else if (currentSlabIndex < slabs.size() - 1) { - // free up the slabs that we are not using. We want to minimize overhead - this.slabs = new ArrayList(slabs.subList(0, currentSlabIndex + 1)); - this.capacity = 0; - for (byte[] slab : slabs) { - capacity += slab.length; - } - } - this.currentSlabIndex = -1; - this.currentSlabPosition = 0; - this.currentSlab = EMPTY_SLAB; - this.size = 0; + // readjust slab size. + // 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size + initSlabs(max(size / 7, initialSize)); } /** @@ -207,7 +187,7 @@ public long size() { * can be passed to {@link #setByte(long, byte)} in order to change it */ public long getCurrentIndex() { - Preconditions.checkArgument(size > 0, "This is an empty stream"); + checkArgument(size > 0, "This is an empty stream"); return size - 1; } @@ -218,8 +198,7 @@ public long getCurrentIndex() { * @param value the value to replace it with */ public void setByte(long index, byte value) { - Preconditions.checkArgument(index < size, - "Index: " + index + " is >= the current size of: " + size); + checkArgument(index < size, "Index: " + index + " is >= the current size of: " + size); long seen = 0; for (int i = 0; i <= currentSlabIndex; i++) { diff --git a/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java b/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java index dd10cd557a..391feb3a20 100644 --- a/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java @@ -29,7 +29,7 @@ public class TestCapacityByteArrayOutputStream { @Test public void testWrite() throws Throwable { - CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10); + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10); final int expectedSize = 54; for (int i = 0; i < expectedSize; i++) { capacityByteArrayOutputStream.write(i); @@ -40,7 +40,7 @@ public void testWrite() throws Throwable { @Test public void testWriteArray() throws Throwable { - CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10); + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10); int v = 23; writeArraysOf3(capacityByteArrayOutputStream, v); validate(capacityByteArrayOutputStream, v * 3); @@ -48,7 +48,7 @@ public void testWriteArray() throws Throwable { @Test public void testWriteArrayAndInt() throws Throwable { - CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10); + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10); for (int i = 0; i < 23; i++) { byte[] toWrite = { (byte)(i * 3), (byte)(i * 3 + 1)}; capacityByteArrayOutputStream.write(toWrite); @@ -59,9 +59,13 @@ public void testWriteArrayAndInt() throws Throwable { } + protected CapacityByteArrayOutputStream newCapacityBAOS(int initialSize) { + return new CapacityByteArrayOutputStream(10, 1000000); + } + @Test public void testReset() throws Throwable { - CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10); + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10); for (int i = 0; i < 54; i++) { capacityByteArrayOutputStream.write(i); assertEquals(i + 1, capacityByteArrayOutputStream.size()); @@ -80,7 +84,7 @@ public void testReset() throws Throwable { @Test public void testWriteArrayBiggerThanSlab() throws Throwable { - CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10); + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10); int v = 23; writeArraysOf3(capacityByteArrayOutputStream, v); int n = v * 3; @@ -106,7 +110,7 @@ public void testWriteArrayBiggerThanSlab() throws Throwable { @Test public void testWriteArrayManySlabs() throws Throwable { - CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10); + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10); int it = 500; int v = 23; for (int j = 0; j < it; j++) { @@ -134,7 +138,7 @@ public void testWriteArrayManySlabs() throws Throwable { public void testReplaceByte() throws Throwable { // test replace the first value { - CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5); + CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5); cbaos.write(10); assertEquals(0, cbaos.getCurrentIndex()); cbaos.setByte(0, (byte) 7); @@ -145,7 +149,7 @@ public void testReplaceByte() throws Throwable { // test replace value in the first slab { - CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5); + CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5); cbaos.write(10); cbaos.write(13); cbaos.write(15); @@ -160,7 +164,7 @@ public void testReplaceByte() throws Throwable { // test replace in *not* the first slab { - CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5); + CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5); // advance part way through the 3rd slab for (int i = 0; i < 12; i++) { @@ -178,7 +182,7 @@ public void testReplaceByte() throws Throwable { // test replace last value of a slab { - CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5); + CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5); // advance part way through the 3rd slab for (int i = 0; i < 12; i++) { @@ -196,7 +200,7 @@ public void testReplaceByte() throws Throwable { // test replace last value { - CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5); + CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5); // advance part way through the 3rd slab for (int i = 0; i < 12; i++) { diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java index 64fb7cdd8d..1a0e42eaee 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -16,6 +16,7 @@ package parquet.hadoop; import static parquet.Log.INFO; +import static parquet.column.statistics.Statistics.getStatsBasedOnType; import java.io.IOException; import java.util.ArrayList; @@ -60,11 +61,11 @@ private static final class ColumnChunkPageWriter implements PageWriter { private Statistics totalStatistics; - private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) { + private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize, int pageSize) { this.path = path; this.compressor = compressor; - this.buf = new CapacityByteArrayOutputStream(initialSize); - this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType()); + this.buf = new CapacityByteArrayOutputStream(initialSize, pageSize); + this.totalStatistics = getStatsBasedOnType(this.path.getType()); } @Override @@ -201,9 +202,9 @@ public String memUsageString(String prefix) { private final Map writers = new HashMap(); - public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) { + public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize, int pageSize) { for (ColumnDescriptor path : schema.getColumns()) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize)); + writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize, pageSize)); } } diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java index 5026bbe290..cd8875d590 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java @@ -105,7 +105,7 @@ private void initStore() { // therefore this size is cast to int, since allocating byte array in under layer needs to // limit the array size in an int scope. int initialBlockBufferSize = Ints.checkedCast(max(MINIMUM_BUFFER_SIZE, rowGroupSize / schema.getColumns().size() / 5)); - pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize); + pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize, pageSize); // we don't want this number to be too small either // ideally, slightly bigger than the page size, but not bigger than the block buffer int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java index f499d1acba..e1223b666c 100644 --- a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -64,7 +64,7 @@ public void test() throws Exception { writer.start(); writer.startBlock(rowCount); { - ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(f.getCompressor(codec, pageSize ), schema , initialSize); + ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(f.getCompressor(codec, pageSize ), schema , initialSize, pageSize); PageWriter pageWriter = store.getPageWriter(col); pageWriter.writePageV2( rowCount, nullCount, valueCount,