From c47b973bc4b07e54934468a180aac84a9154d6d6 Mon Sep 17 00:00:00 2001 From: Alex Levenson <alexlevenson@twitter.com> Date: Wed, 4 Mar 2015 17:26:44 -0800 Subject: [PATCH] PARQUET-160: avoid wasting 64K per empty buffer. This buffer initializes itself to a default size when instantiated. This leads to a lot of unused small buffers when there are a lot of empty columns. Author: Alex Levenson <alexlevenson@twitter.com> Author: julien <julien@twitter.com> Author: Julien Le Dem <julien@twitter.com> Closes #98 from julienledem/avoid_wasting_64K_per_empty_buffer and squashes the following commits: b0200dd [julien] add license a1b278e [julien] Merge branch 'master' into avoid_wasting_64K_per_empty_buffer 5304ee1 [julien] remove unused constant 81e399f [julien] Merge branch 'avoid_wasting_64K_per_empty_buffer' of github.com:julienledem/incubator-parquet-mr into avoid_wasting_64K_per_empty_buffer ccf677d [julien] Merge branch 'master' into avoid_wasting_64K_per_empty_buffer 37148d6 [Julien Le Dem] Merge pull request #2 from isnotinvain/PR-98 b9abab0 [Alex Levenson] Address Julien's comment 965af7f [Alex Levenson] one more typo 9939d8d [Alex Levenson] fix typos in comments 61c0100 [Alex Levenson] Make initial slab size heuristic into a helper method, apply in DictionaryValuesWriter as well a257ee4 [Alex Levenson] Improve IndexOutOfBoundsException message 64d6c7f [Alex Levenson] update comments 8b54667 [Alex Levenson] Don't use CapacityByteArrayOutputStream for writing page chunks 6a20e8b [Alex Levenson] Remove initialSlabSize decision from InternalParquetRecordReader, use a simpler heuristic in the column writers instead 3a0f8e4 [Alex Levenson] Use simpler settings for column chunk writer b2736a1 [Alex Levenson] Some cleanup in CapacityByteArrayOutputStream 1df4a71 [julien] refactor CapacityByteArray to be aware of page size 95c8fb6 [julien] avoid wasting 64K per empty buffer. --- .../parquet/column/ParquetProperties.java | 47 ++-- .../column/impl/ColumnWriteStoreV1.java | 6 +- .../column/impl/ColumnWriteStoreV2.java | 4 +- .../parquet/column/impl/ColumnWriterV1.java | 15 +- .../parquet/column/impl/ColumnWriterV2.java | 17 +- .../bitpacking/BitPackingValuesWriter.java | 5 +- .../column/values/boundedint/BitWriter.java | 4 +- .../boundedint/BoundedIntValuesFactory.java | 4 +- .../boundedint/BoundedIntValuesWriter.java | 4 +- .../delta/DeltaBinaryPackingValuesWriter.java | 14 +- .../DeltaLengthByteArrayValuesWriter.java | 10 +- .../deltastrings/DeltaByteArrayWriter.java | 8 +- .../dictionary/DictionaryValuesWriter.java | 22 +- .../FixedLenByteArrayPlainValuesWriter.java | 14 +- .../values/plain/PlainValuesWriter.java | 4 +- .../rle/RunLengthBitPackingHybridEncoder.java | 4 +- ...RunLengthBitPackingHybridValuesWriter.java | 4 +- .../column/impl/TestColumnReaderImpl.java | 4 +- .../parquet/column/mem/TestMemColumn.java | 2 +- .../bitpacking/TestBitPackingColumn.java | 2 +- .../values/boundedint/TestBoundedColumns.java | 4 +- .../DeltaBinaryPackingValuesWriterTest.java | 4 +- .../benchmark/BenchmarkIntegerOutputSize.java | 4 +- .../BenchmarkReadingRandomIntegers.java | 4 +- .../benchmark/RandomWritingBenchmarkTest.java | 6 +- .../SmallRangeWritingBenchmarkTest.java | 2 +- .../TestDeltaLengthByteArray.java | 8 +- .../BenchmarkDeltaLengthByteArray.java | 16 +- .../deltastrings/TestDeltaByteArray.java | 12 +- .../benchmark/BenchmarkDeltaByteArray.java | 28 +- .../values/dictionary/TestDictionary.java | 2 +- ...LengthBitPackingHybridIntegrationTest.java | 2 +- .../TestRunLengthBitPackingHybridEncoder.java | 22 +- .../src/test/java/parquet/io/PerfTest.java | 2 +- .../test/java/parquet/io/TestColumnIO.java | 2 +- .../test/java/parquet/io/TestFiltered.java | 2 +- .../bytes/CapacityByteArrayOutputStream.java | 258 ++++++++++-------- .../ConcatenatingByteArrayCollector.java | 63 +++++ .../TestCapacityByteArrayOutputStream.java | 26 +- .../hadoop/ColumnChunkPageWriteStore.java | 54 ++-- .../hadoop/InternalParquetRecordWriter.java | 16 +- .../hadoop/TestColumnChunkPageWriteStore.java | 13 +- .../parquet/pig/TupleConsumerPerfTest.java | 2 +- .../thrift/TestParquetReadProtocol.java | 2 +- 44 files changed, 430 insertions(+), 318 deletions(-) create mode 100644 parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java diff --git a/parquet-column/src/main/java/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/parquet/column/ParquetProperties.java index 578c7c3f1b..31bec369ab 100644 --- a/parquet-column/src/main/java/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/parquet/column/ParquetProperties.java @@ -81,29 +81,29 @@ public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean this.enableDictionary = enableDict; } - public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol) { + public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) { if (maxLevel == 0) { return new DevNullValuesWriter(); } else { return new RunLengthBitPackingHybridValuesWriter( - getWidthFromMaxInt(maxLevel), initialSizePerCol); + getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize); } } - private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol) { + private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) { switch (path.getType()) { case BOOLEAN: return new BooleanPlainValuesWriter(); case INT96: - return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol); + return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize); case FIXED_LEN_BYTE_ARRAY: - return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol); + return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize); case BINARY: case INT32: case INT64: case DOUBLE: case FLOAT: - return new PlainValuesWriter(initialSizePerCol); + return new PlainValuesWriter(initialSizePerCol, pageSize); default: throw new IllegalArgumentException("Unknown type " + path.getType()); } @@ -146,24 +146,24 @@ private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initi } } - private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol) { + private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol, int pageSize) { switch(writerVersion) { case PARQUET_1_0: - return plainWriter(path, initialSizePerCol); + return plainWriter(path, initialSizePerCol, pageSize); case PARQUET_2_0: switch (path.getType()) { case BOOLEAN: - return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol); + return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize); case BINARY: case FIXED_LEN_BYTE_ARRAY: - return new DeltaByteArrayWriter(initialSizePerCol); + return new DeltaByteArrayWriter(initialSizePerCol, pageSize); case INT32: - return new DeltaBinaryPackingValuesWriter(initialSizePerCol); + return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize); case INT96: case INT64: case DOUBLE: case FLOAT: - return plainWriter(path, initialSizePerCol); + return plainWriter(path, initialSizePerCol, pageSize); default: throw new IllegalArgumentException("Unknown type " + path.getType()); } @@ -172,8 +172,8 @@ private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePe } } - private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol) { - ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol); + private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol, int pageSize) { + ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol, pageSize); if (enableDictionary) { return FallbackValuesWriter.of( dictionaryWriter(path, initialSizePerCol), @@ -183,16 +183,16 @@ private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSi } } - public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol) { + public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) { switch (path.getType()) { case BOOLEAN: // no dictionary encoding for boolean - return writerToFallbackTo(path, initialSizePerCol); + return writerToFallbackTo(path, initialSizePerCol, pageSize); case FIXED_LEN_BYTE_ARRAY: // dictionary encoding for that type was not enabled in PARQUET 1.0 if (writerVersion == WriterVersion.PARQUET_2_0) { - return dictWriterWithFallBack(path, initialSizePerCol); + return dictWriterWithFallBack(path, initialSizePerCol, pageSize); } else { - return writerToFallbackTo(path, initialSizePerCol); + return writerToFallbackTo(path, initialSizePerCol, pageSize); } case BINARY: case INT32: @@ -200,7 +200,7 @@ public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol case INT96: case DOUBLE: case FLOAT: - return dictWriterWithFallBack(path, initialSizePerCol); + return dictWriterWithFallBack(path, initialSizePerCol, pageSize); default: throw new IllegalArgumentException("Unknown type " + path.getType()); } @@ -220,19 +220,20 @@ public boolean isEnableDictionary() { public ColumnWriteStore newColumnWriteStore( MessageType schema, - PageWriteStore pageStore, int pageSize, - int initialPageBufferSize) { + PageWriteStore pageStore, + int pageSize) { switch (writerVersion) { case PARQUET_1_0: return new ColumnWriteStoreV1( pageStore, - pageSize, initialPageBufferSize, dictionaryPageSizeThreshold, + pageSize, + dictionaryPageSizeThreshold, enableDictionary, writerVersion); case PARQUET_2_0: return new ColumnWriteStoreV2( schema, pageStore, - pageSize, initialPageBufferSize, + pageSize, new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary)); default: throw new IllegalArgumentException("unknown version " + writerVersion); diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java index 94dfc505a7..2c476cac3f 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java @@ -39,14 +39,12 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore { private final int pageSizeThreshold; private final int dictionaryPageSizeThreshold; private final boolean enableDictionary; - private final int initialSizePerCol; private final WriterVersion writerVersion; - public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) { + public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) { super(); this.pageWriteStore = pageWriteStore; this.pageSizeThreshold = pageSizeThreshold; - this.initialSizePerCol = initialSizePerCol; this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold; this.enableDictionary = enableDictionary; this.writerVersion = writerVersion; @@ -67,7 +65,7 @@ public Set<ColumnDescriptor> getColumnDescriptors() { private ColumnWriterV1 newMemColumn(ColumnDescriptor path) { PageWriter pageWriter = pageWriteStore.getPageWriter(path); - return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, initialSizePerCol, dictionaryPageSizeThreshold, enableDictionary, writerVersion); + return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion); } @Override diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java index 05a5c953aa..2dc342e7c8 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java @@ -56,7 +56,7 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore { public ColumnWriteStoreV2( MessageType schema, PageWriteStore pageWriteStore, - int pageSizeThreshold, int initialSizePerCol, + int pageSizeThreshold, ParquetProperties parquetProps) { super(); this.pageSizeThreshold = pageSizeThreshold; @@ -64,7 +64,7 @@ public ColumnWriteStoreV2( Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>(); for (ColumnDescriptor path : schema.getColumns()) { PageWriter pageWriter = pageWriteStore.getPageWriter(path); - mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps)); + mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold)); } this.columns = unmodifiableMap(mcolumns); this.writers = this.columns.values(); diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java index 7be6b15a04..ce4af80ba1 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java @@ -23,6 +23,7 @@ import java.io.IOException; import parquet.Log; +import parquet.bytes.CapacityByteArrayOutputStream; import parquet.column.ColumnDescriptor; import parquet.column.ColumnWriter; import parquet.column.ParquetProperties; @@ -34,6 +35,9 @@ import parquet.io.ParquetEncodingException; import parquet.io.api.Binary; +import static java.lang.Math.max; +import static java.lang.Math.pow; + /** * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer. * @@ -44,6 +48,7 @@ final class ColumnWriterV1 implements ColumnWriter { private static final Log LOG = Log.getLog(ColumnWriterV1.class); private static final boolean DEBUG = Log.DEBUG; private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100; + private static final int MIN_SLAB_SIZE = 64; private final ColumnDescriptor path; private final PageWriter pageWriter; @@ -60,7 +65,6 @@ public ColumnWriterV1( ColumnDescriptor path, PageWriter pageWriter, int pageSizeThreshold, - int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) { @@ -72,9 +76,12 @@ public ColumnWriterV1( resetStatistics(); ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary); - this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol); - this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol); - this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol); + + this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold); + this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold); + + int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10); + this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold); } private void log(Object value, int r, int d) { diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java index adc88c1e23..e7b8c1c992 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java @@ -18,6 +18,8 @@ */ package parquet.column.impl; +import static java.lang.Math.max; +import static java.lang.Math.pow; import static parquet.bytes.BytesUtils.getWidthFromMaxInt; import java.io.IOException; @@ -25,6 +27,7 @@ import parquet.Ints; import parquet.Log; import parquet.bytes.BytesInput; +import parquet.bytes.CapacityByteArrayOutputStream; import parquet.column.ColumnDescriptor; import parquet.column.ColumnWriter; import parquet.column.Encoding; @@ -46,6 +49,7 @@ final class ColumnWriterV2 implements ColumnWriter { private static final Log LOG = Log.getLog(ColumnWriterV2.class); private static final boolean DEBUG = Log.DEBUG; + private static final int MIN_SLAB_SIZE = 64; private final ColumnDescriptor path; private final PageWriter pageWriter; @@ -60,14 +64,17 @@ final class ColumnWriterV2 implements ColumnWriter { public ColumnWriterV2( ColumnDescriptor path, PageWriter pageWriter, - int initialSizePerCol, - ParquetProperties parquetProps) { + ParquetProperties parquetProps, + int pageSize) { this.path = path; this.pageWriter = pageWriter; resetStatistics(); - this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol); - this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol); - this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol); + + this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize); + this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize); + + int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10); + this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize); } private void log(Object value, int r, int d) { diff --git a/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesWriter.java index be6d2a0838..69175ca6d1 100644 --- a/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesWriter.java @@ -45,10 +45,11 @@ public class BitPackingValuesWriter extends ValuesWriter { /** * @param bound the maximum value stored by this column + * @param pageSize */ - public BitPackingValuesWriter(int bound, int initialCapacity) { + public BitPackingValuesWriter(int bound, int initialCapacity, int pageSize) { this.bitsPerValue = getWidthFromMaxInt(bound); - this.out = new CapacityByteArrayOutputStream(initialCapacity); + this.out = new CapacityByteArrayOutputStream(initialCapacity, pageSize); init(); } diff --git a/parquet-column/src/main/java/parquet/column/values/boundedint/BitWriter.java b/parquet-column/src/main/java/parquet/column/values/boundedint/BitWriter.java index 041bb50b44..f2f673d325 100644 --- a/parquet-column/src/main/java/parquet/column/values/boundedint/BitWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/boundedint/BitWriter.java @@ -41,8 +41,8 @@ class BitWriter { } } - public BitWriter(int initialCapacity) { - this.baos = new CapacityByteArrayOutputStream(initialCapacity); + public BitWriter(int initialCapacity, int pageSize) { + this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize); } public void writeBit(boolean bit) { diff --git a/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesFactory.java b/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesFactory.java index 0de7006b58..9bdc1380d8 100644 --- a/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesFactory.java +++ b/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesFactory.java @@ -26,7 +26,7 @@ public static ValuesReader getBoundedReader(int bound) { return bound == 0 ? new ZeroIntegerValuesReader() : new BoundedIntValuesReader(bound); } - public static ValuesWriter getBoundedWriter(int bound, int initialCapacity) { - return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity); + public static ValuesWriter getBoundedWriter(int bound, int initialCapacity, int pageSize) { + return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity, pageSize); } } diff --git a/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesWriter.java index 70852150f3..9e521894b8 100644 --- a/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesWriter.java @@ -59,11 +59,11 @@ class BoundedIntValuesWriter extends ValuesWriter { } } - public BoundedIntValuesWriter(int bound, int initialCapacity) { + public BoundedIntValuesWriter(int bound, int initialCapacity, int pageSize) { if (bound == 0) { throw new ParquetEncodingException("Value bound cannot be 0. Use DevNullColumnWriter instead."); } - this.bitWriter = new BitWriter(initialCapacity); + this.bitWriter = new BitWriter(initialCapacity, pageSize); bitsPerValue = (int)Math.ceil(Math.log(bound + 1)/Math.log(2)); shouldRepeatThreshold = (bitsPerValue + 9)/(1 + bitsPerValue); if (Log.DEBUG) LOG.debug("init column with bit width of " + bitsPerValue + " and repeat threshold of " + shouldRepeatThreshold); diff --git a/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java index ff85d21a86..e2fa25a6b8 100644 --- a/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java @@ -55,9 +55,9 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter { * reused between flushes. */ public static final int MAX_BITWIDTH = 32; - + public static final int DEFAULT_NUM_BLOCK_VALUES = 128; - + public static final int DEFAULT_NUM_MINIBLOCKS = 4; private final CapacityByteArrayOutputStream baos; @@ -110,17 +110,17 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter { * it will be reset after each flush */ private int minDeltaInCurrentBlock = Integer.MAX_VALUE; - - public DeltaBinaryPackingValuesWriter(int slabSize) { - this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize); + + public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize) { + this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize); } - public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize) { + public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize) { this.config = new DeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum); bitWidths = new int[config.miniBlockNumInABlock]; deltaBlockBuffer = new int[blockSizeInValues]; miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH]; - baos = new CapacityByteArrayOutputStream(slabSize); + baos = new CapacityByteArrayOutputStream(slabSize, pageSize); } @Override diff --git a/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java index 11c197fc29..3fed3f73c5 100644 --- a/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java @@ -35,7 +35,7 @@ * <pre> * {@code * delta-length-byte-array : length* byte-array* - * } + * } * </pre> * @author Aniket Mokashi * @@ -48,13 +48,13 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter { private CapacityByteArrayOutputStream arrayOut; private LittleEndianDataOutputStream out; - public DeltaLengthByteArrayValuesWriter(int initialSize) { - arrayOut = new CapacityByteArrayOutputStream(initialSize); + public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize) { + arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize); out = new LittleEndianDataOutputStream(arrayOut); lengthWriter = new DeltaBinaryPackingValuesWriter( DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES, DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS, - initialSize); + initialSize, pageSize); } @Override @@ -101,6 +101,6 @@ public long getAllocatedSize() { @Override public String memUsageString(String prefix) { - return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY"); + return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY"); } } diff --git a/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayWriter.java b/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayWriter.java index f8b05308da..0d1200a092 100644 --- a/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayWriter.java @@ -30,7 +30,7 @@ * <pre> * {@code * delta-length-byte-array : prefix-length* suffixes* - * } + * } * </pre> * @author Aniket Mokashi * @@ -41,9 +41,9 @@ public class DeltaByteArrayWriter extends ValuesWriter{ private ValuesWriter suffixWriter; private byte[] previous; - public DeltaByteArrayWriter(int initialCapacity) { - this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity); - this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity); + public DeltaByteArrayWriter(int initialCapacity, int pageSize) { + this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity, pageSize); + this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity, pageSize); this.previous = new byte[0]; } diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java index 1dbda6f798..c986c79f78 100644 --- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java @@ -42,6 +42,7 @@ import parquet.Log; import parquet.bytes.BytesInput; import parquet.bytes.BytesUtils; +import parquet.bytes.CapacityByteArrayOutputStream; import parquet.column.Encoding; import parquet.column.page.DictionaryPage; import parquet.column.values.RequiresFallback; @@ -65,6 +66,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req /* max entries allowed for the dictionary will fail over to plain encoding if reached */ private static final int MAX_DICTIONARY_ENTRIES = Integer.MAX_VALUE - 1; + private static final int MIN_INITIAL_SLAB_SIZE = 64; /* encoding to label the data page */ private final Encoding encodingForDataPage; @@ -145,8 +147,12 @@ public BytesInput getBytes() { int maxDicId = getDictionarySize() - 1; if (DEBUG) LOG.debug("max dic id " + maxDicId); int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId); - // TODO: what is a good initialCapacity? - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024); + + int initialSlabSize = + CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10); + + RunLengthBitPackingHybridEncoder encoder = + new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize); IntIterator iterator = encodedValues.iterator(); try { while (iterator.hasNext()) { @@ -240,7 +246,7 @@ public void writeBytes(Binary v) { public DictionaryPage createDictionaryPage() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -315,7 +321,7 @@ public void writeBytes(Binary value) { public DictionaryPage createDictionaryPage() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize); + FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize, maxDictionaryByteSize); Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -360,7 +366,7 @@ public void writeLong(long v) { public DictionaryPage createDictionaryPage() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); LongIterator longIterator = longDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -432,7 +438,7 @@ public void writeDouble(double v) { public DictionaryPage createDictionaryPage() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); DoubleIterator doubleIterator = doubleDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -504,7 +510,7 @@ public void writeInteger(int v) { public DictionaryPage createDictionaryPage() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); it.unimi.dsi.fastutil.ints.IntIterator intIterator = intDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -576,7 +582,7 @@ public void writeFloat(float v) { public DictionaryPage createDictionaryPage() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); FloatIterator floatIterator = floatDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { diff --git a/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java index 18a6f8a929..a648efed8e 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java @@ -36,14 +36,14 @@ */ public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter { private static final Log LOG = Log.getLog(PlainValuesWriter.class); - + private CapacityByteArrayOutputStream arrayOut; private LittleEndianDataOutputStream out; private int length; - - public FixedLenByteArrayPlainValuesWriter(int length, int initialSize) { + + public FixedLenByteArrayPlainValuesWriter(int length, int initialSize, int pageSize) { this.length = length; - this.arrayOut = new CapacityByteArrayOutputStream(initialSize); + this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize); this.out = new LittleEndianDataOutputStream(arrayOut); } @@ -59,7 +59,7 @@ public final void writeBytes(Binary v) { throw new ParquetEncodingException("could not write fixed bytes", e); } } - + @Override public long getBufferedSize() { return arrayOut.size(); @@ -75,7 +75,7 @@ public BytesInput getBytes() { if (Log.DEBUG) LOG.debug("writing a buffer of size " + arrayOut.size()); return BytesInput.from(arrayOut); } - + @Override public void reset() { arrayOut.reset(); @@ -85,7 +85,7 @@ public void reset() { public long getAllocatedSize() { return arrayOut.getCapacity(); } - + @Override public Encoding getEncoding() { return Encoding.PLAIN; diff --git a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesWriter.java index ed3485e4d6..dd624a4ce5 100644 --- a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesWriter.java @@ -44,8 +44,8 @@ public class PlainValuesWriter extends ValuesWriter { private CapacityByteArrayOutputStream arrayOut; private LittleEndianDataOutputStream out; - public PlainValuesWriter(int initialSize) { - arrayOut = new CapacityByteArrayOutputStream(initialSize); + public PlainValuesWriter(int initialSize, int pageSize) { + arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize); out = new LittleEndianDataOutputStream(arrayOut); } diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java index 7d8257d931..145621767d 100644 --- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java @@ -116,7 +116,7 @@ public class RunLengthBitPackingHybridEncoder { private boolean toBytesCalled; - public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity) { + public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity, int pageSize) { if (DEBUG) { LOG.debug(String.format("Encoding: RunLengthBitPackingHybridEncoder with " + "bithWidth: %d initialCapacity %d", bitWidth, initialCapacity)); @@ -125,7 +125,7 @@ public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity) { Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); this.bitWidth = bitWidth; - this.baos = new CapacityByteArrayOutputStream(initialCapacity); + this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize); this.packBuffer = new byte[bitWidth]; this.bufferedValues = new int[8]; this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java index 60ac5dee0b..641f5fe7db 100644 --- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java @@ -32,8 +32,8 @@ public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter { private final RunLengthBitPackingHybridEncoder encoder; - public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity) { - this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity); + public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity, int pageSize) { + this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, pageSize); } @Override diff --git a/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java index 5aae7e3db8..1cbc21d580 100644 --- a/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java +++ b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java @@ -56,7 +56,7 @@ public void test() { MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }"); ColumnDescriptor col = schema.getColumns().get(0); MemPageWriter pageWriter = new MemPageWriter(); - ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true)); + ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048); for (int i = 0; i < rows; i++) { columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0); if ((i + 1) % 1000 == 0) { @@ -91,7 +91,7 @@ public void testOptional() { MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }"); ColumnDescriptor col = schema.getColumns().get(0); MemPageWriter pageWriter = new MemPageWriter(); - ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true)); + ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048); for (int i = 0; i < rows; i++) { columnWriterV2.writeNull(0, 0); if ((i + 1) % 1000 == 0) { diff --git a/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java index b84fec1baa..28440a65bc 100644 --- a/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java +++ b/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java @@ -159,6 +159,6 @@ public void testMemColumnSeveralPagesRepeated() throws Exception { } private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) { - return new ColumnWriteStoreV1(memPageStore, 2048, 2048, 2048, false, WriterVersion.PARQUET_1_0); + return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0); } } diff --git a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java index 99ba8eb848..7fd89d5189 100644 --- a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java +++ b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java @@ -188,7 +188,7 @@ public ValuesReader getReader(final int bound) { return new BitPackingValuesReader(bound); } public ValuesWriter getWriter(final int bound) { - return new BitPackingValuesWriter(bound, 32*1024); + return new BitPackingValuesWriter(bound, 32*1024, 64*1024); } } , diff --git a/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java b/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java index 4e86ec628a..e2d691352f 100644 --- a/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java +++ b/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java @@ -57,7 +57,7 @@ public void testWriterNoRepeat() throws IOException { } private void compareOutput(int bound, int[] ints, String[] result) throws IOException { - BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64*1024); + BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64*1024, 64*1024); for (int i : ints) { bicw.writeInteger(i); } @@ -126,7 +126,7 @@ public void testSerDe() throws Exception { ByteArrayOutputStream tmp = new ByteArrayOutputStream(); int[] stream = new int[totalValuesInStream]; - BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64 * 1024); + BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64 * 1024, 64*1024); int idx = 0; for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) { int next = 0; diff --git a/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java b/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java index 0eb7f9543e..79af39dd58 100644 --- a/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java +++ b/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java @@ -42,13 +42,13 @@ public class DeltaBinaryPackingValuesWriterTest { public void setUp() { blockSize = 128; miniBlockNum = 4; - writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100); + writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 200); random = new Random(); } @Test(expected = IllegalArgumentException.class) public void miniBlockSizeShouldBeMultipleOf8() { - new DeltaBinaryPackingValuesWriter(1281, 4, 100); + new DeltaBinaryPackingValuesWriter(1281, 4, 100, 100); } /* When data size is multiple of Block*/ diff --git a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java index 73d360fbf5..cce4f3815b 100644 --- a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java +++ b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java @@ -77,8 +77,8 @@ public int getIntValue() { } public void testRandomIntegers(IntFunc func,int bitWidth) { - DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum,100); - RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth,100); + DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum, 100, 20000); + RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth, 100, 20000); for (int i = 0; i < dataSize; i++) { int v = func.getIntValue(); delta.writeInteger(v); diff --git a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java index d9704becf1..245769d83d 100644 --- a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java +++ b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java @@ -54,8 +54,8 @@ public static void prepare() throws IOException { data[i] = random.nextInt(100) - 200; } - ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100); - ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100); + ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000); + ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000); for (int i = 0; i < data.length; i++) { delta.writeInteger(data[i]); diff --git a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java index 08dd285636..15fb13035c 100644 --- a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java +++ b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java @@ -50,21 +50,21 @@ public static void prepare() { @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2) @Test public void writeDeltaPackingTest(){ - DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100); + DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000); runWriteTest(writer); } @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2) @Test public void writeRLETest(){ - ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(32,100); + ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000); runWriteTest(writer); } @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2) @Test public void writeDeltaPackingTest2(){ - DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100); + DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000); runWriteTest(writer); } } diff --git a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java index 761a6b387e..39cf0a7ef6 100644 --- a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java +++ b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java @@ -42,7 +42,7 @@ public static void prepare() { @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2) @Test public void writeRLEWithSmallBitWidthTest(){ - ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(2,100); + ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(2, 100, 20000); runWriteTest(writer); } } diff --git a/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java index 0c604e2346..38c5b52903 100644 --- a/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java +++ b/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java @@ -34,7 +34,7 @@ public class TestDeltaLengthByteArray { @Test public void testSerialization () throws IOException { - DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64*1024); + DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024); DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); Utils.writeData(writer, values); @@ -44,10 +44,10 @@ public void testSerialization () throws IOException { Assert.assertEquals(Binary.fromString(values[i]), bin[i]); } } - + @Test public void testRandomStrings() throws IOException { - DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64*1024); + DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024); DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); String[] values = Utils.getRandomStringSamples(1000, 32); @@ -61,7 +61,7 @@ public void testRandomStrings() throws IOException { @Test public void testLengths() throws IOException { - DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64*1024); + DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024); ValuesReader reader = new DeltaBinaryPackingValuesReader(); Utils.writeData(writer, values); diff --git a/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java b/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java index 2b3b5d1bc6..c74db12648 100644 --- a/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java +++ b/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java @@ -38,30 +38,30 @@ @AxisRange(min = 0, max = 1) @BenchmarkMethodChart(filePrefix = "benchmark-encoding-writing-random") public class BenchmarkDeltaLengthByteArray { - + @Rule public org.junit.rules.TestRule benchmarkRun = new BenchmarkRule(); - + String[] values = Utils.getRandomStringSamples(1000000, 32); - + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException { - PlainValuesWriter writer = new PlainValuesWriter(64*1024); + PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024); BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); - + Utils.writeData(writer, values); byte [] data = writer.getBytes().toByteArray(); Binary[] bin = Utils.readData(reader, data, values.length); System.out.println("size " + data.length); } - + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws IOException { - DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64*1024); + DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024); DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); - + Utils.writeData(writer, values); byte [] data = writer.getBytes().toByteArray(); Binary[] bin = Utils.readData(reader, data, values.length); diff --git a/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java b/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java index 579aa0a743..24dee2472d 100644 --- a/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java +++ b/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java @@ -29,13 +29,13 @@ import parquet.io.api.Binary; public class TestDeltaByteArray { - + static String[] values = {"parquet-mr", "parquet", "parquet-format"}; static String[] randvalues = Utils.getRandomStringSamples(10000, 32); @Test public void testSerialization () throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64*1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); DeltaByteArrayReader reader = new DeltaByteArrayReader(); Utils.writeData(writer, values); @@ -45,10 +45,10 @@ public void testSerialization () throws IOException { Assert.assertEquals(Binary.fromString(values[i]), bin[i]); } } - + @Test public void testRandomStrings() throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64*1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); DeltaByteArrayReader reader = new DeltaByteArrayReader(); Utils.writeData(writer, randvalues); @@ -61,7 +61,7 @@ public void testRandomStrings() throws IOException { @Test public void testLengths() throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64*1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); ValuesReader reader = new DeltaBinaryPackingValuesReader(); Utils.writeData(writer, values); @@ -72,7 +72,7 @@ public void testLengths() throws IOException { Assert.assertEquals(0, bin[0]); Assert.assertEquals(7, bin[1]); Assert.assertEquals(7, bin[2]); - + int offset = reader.getNextOffset(); reader = new DeltaBinaryPackingValuesReader(); bin = Utils.readInts(reader, writer.getBytes().toByteArray(), offset, values.length); diff --git a/parquet-column/src/test/java/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java b/parquet-column/src/test/java/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java index f9457e5a8a..2c67bde35c 100644 --- a/parquet-column/src/test/java/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java +++ b/parquet-column/src/test/java/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java @@ -39,10 +39,10 @@ @AxisRange(min = 0, max = 1) @BenchmarkMethodChart(filePrefix = "benchmark-encoding-writing-random") public class BenchmarkDeltaByteArray { - + @Rule public org.junit.rules.TestRule benchmarkRun = new BenchmarkRule(); - + static String[] values = Utils.getRandomStringSamples(1000000, 32); static String[] sortedVals; static @@ -50,49 +50,49 @@ public class BenchmarkDeltaByteArray { sortedVals = Arrays.copyOf(values, values.length); Arrays.sort(sortedVals); } - + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException { - PlainValuesWriter writer = new PlainValuesWriter(64*1024); + PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024); BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); - + Utils.writeData(writer, values); byte [] data = writer.getBytes().toByteArray(); Binary[] bin = Utils.readData(reader, data, values.length); System.out.println("size " + data.length); } - + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64*1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); DeltaByteArrayReader reader = new DeltaByteArrayReader(); - + Utils.writeData(writer, values); byte [] data = writer.getBytes().toByteArray(); Binary[] bin = Utils.readData(reader, data, values.length); System.out.println("size " + data.length); } - + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkSortedStringsWithPlainValuesWriter() throws IOException { - PlainValuesWriter writer = new PlainValuesWriter(64*1024); + PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024); BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); - + Utils.writeData(writer, sortedVals); byte [] data = writer.getBytes().toByteArray(); Binary[] bin = Utils.readData(reader, data, values.length); System.out.println("size " + data.length); } - + @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkSortedStringsWithDeltaLengthByteArrayValuesWriter() throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64*1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); DeltaByteArrayReader reader = new DeltaByteArrayReader(); - + Utils.writeData(writer, sortedVals); byte [] data = writer.getBytes().toByteArray(); Binary[] bin = Utils.readData(reader, data, values.length); diff --git a/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java index a2cd4bc1e7..93f896de63 100644 --- a/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java +++ b/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java @@ -53,7 +53,7 @@ public class TestDictionary { private <I extends DictionaryValuesWriter> FallbackValuesWriter<I, PlainValuesWriter> plainFallBack(I dvw, int initialSize) { - return FallbackValuesWriter.of(dvw, new PlainValuesWriter(initialSize)); + return FallbackValuesWriter.of(dvw, new PlainValuesWriter(initialSize, initialSize * 5)); } private FallbackValuesWriter<PlainBinaryDictionaryValuesWriter, PlainValuesWriter> newPlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) { diff --git a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java index 5b18abe386..80a7b0ade3 100644 --- a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java +++ b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java @@ -40,7 +40,7 @@ public void integrationTest() throws Exception { private void doIntegrationTest(int bitWidth) throws Exception { long modValue = 1L << bitWidth; - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 1000); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 1000, 64000); int numValues = 0; for (int i = 0; i < 100; i++) { diff --git a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java index 9b51915fb9..82f8e55f90 100644 --- a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java @@ -35,10 +35,10 @@ * @author Alex Levenson */ public class TestRunLengthBitPackingHybridEncoder { - + @Test public void testRLEOnly() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); for (int i = 0; i < 100; i++) { encoder.writeInt(4); } @@ -68,7 +68,7 @@ public void testRepeatedZeros() throws Exception { // make sure that repeated 0s at the beginning // of the stream don't trip up the repeat count - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); for (int i = 0; i < 10; i++) { encoder.writeInt(0); } @@ -86,7 +86,7 @@ public void testRepeatedZeros() throws Exception { @Test public void testBitWidthZero() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(0, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(0, 5, 10); for (int i = 0; i < 10; i++) { encoder.writeInt(0); } @@ -102,7 +102,7 @@ public void testBitWidthZero() throws Exception { @Test public void testBitPackingOnly() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); for (int i = 0; i < 100; i++) { encoder.writeInt(i % 3); @@ -125,7 +125,7 @@ public void testBitPackingOnly() throws Exception { @Test public void testBitPackingOverflow() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); for (int i = 0; i < 1000; i++) { encoder.writeInt(i % 3); @@ -157,7 +157,7 @@ public void testBitPackingOverflow() throws Exception { @Test public void testTransitionFromBitPackingToRle() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); // 5 obviously bit-packed values encoder.writeInt(0); @@ -195,7 +195,7 @@ public void testTransitionFromBitPackingToRle() throws Exception { @Test public void testPaddingZerosOnUnfinishedBitPackedRuns() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(5, 5); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(5, 5, 10); for (int i = 0; i < 9; i++) { encoder.writeInt(i+1); } @@ -214,7 +214,7 @@ public void testPaddingZerosOnUnfinishedBitPackedRuns() throws Exception { @Test public void testSwitchingModes() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(9, 100); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(9, 100, 1000); // rle first for (int i = 0; i < 25; i++) { @@ -278,14 +278,14 @@ public void testSwitchingModes() throws Exception { // end of stream assertEquals(-1, is.read()); } - + @Test public void testGroupBoundary() throws Exception { byte[] bytes = new byte[2]; // Create an RLE byte stream that has 3 values (1 literal group) with // bit width 2. - bytes[0] = (1 << 1 )| 1; + bytes[0] = (1 << 1 )| 1; bytes[1] = (1 << 0) | (2 << 2) | (3 << 4); ByteArrayInputStream stream = new ByteArrayInputStream(bytes); RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, stream); diff --git a/parquet-column/src/test/java/parquet/io/PerfTest.java b/parquet-column/src/test/java/parquet/io/PerfTest.java index 3701725cc2..42818974e0 100644 --- a/parquet-column/src/test/java/parquet/io/PerfTest.java +++ b/parquet-column/src/test/java/parquet/io/PerfTest.java @@ -77,7 +77,7 @@ private static void read(MemPageStore memPageStore, MessageType myschema, private static void write(MemPageStore memPageStore) { - ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); + ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); MessageColumnIO columnIO = newColumnFactory(schema); GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema); diff --git a/parquet-column/src/test/java/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/parquet/io/TestColumnIO.java index 4a7c30634f..f351eaf51a 100644 --- a/parquet-column/src/test/java/parquet/io/TestColumnIO.java +++ b/parquet-column/src/test/java/parquet/io/TestColumnIO.java @@ -517,7 +517,7 @@ public void testPushParser() { } private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) { - return new ColumnWriteStoreV1(memPageStore, 800, 800, 800, useDictionary, WriterVersion.PARQUET_1_0); + return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0); } @Test diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java index b3c7478d15..c1d538fd17 100644 --- a/parquet-column/src/test/java/parquet/io/TestFiltered.java +++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java @@ -257,7 +257,7 @@ public void testFilteredNotPaged() { private MemPageStore writeTestRecords(MessageColumnIO columnIO, int number) { MemPageStore memPageStore = new MemPageStore(number * 2); - ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, 800, false, WriterVersion.PARQUET_1_0); + ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0); GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema); for ( int i = 0; i < number; i++ ) { diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index 580bb341d1..d307471be1 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -18,6 +18,12 @@ */ package parquet.bytes; +import static java.lang.Math.max; +import static java.lang.Math.pow; +import static java.lang.String.format; +import static java.lang.System.arraycopy; +import static parquet.Preconditions.checkArgument; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -25,113 +31,155 @@ import java.util.List; import parquet.Log; -import parquet.Preconditions; /** - * functionality of ByteArrayOutputStream without the memory and copy overhead + * Similar to a {@link ByteArrayOutputStream}, but uses a different strategy for growing that does not involve copying. + * Where ByteArrayOutputStream is backed by a single array that "grows" by copying into a new larger array, this output + * stream grows by allocating a new array (slab) and adding it to a list of previous arrays. * - * It will linearly create a new slab of the initial size when needed (instead of creating a new buffer and copying the data). - * After 10 slabs their size will increase exponentially (similar to {@link ByteArrayOutputStream} behavior) by making the new slab size the size of the existing data. + * Each new slab is allocated to be the same size as all the previous slabs combined, so these allocations become + * exponentially less frequent, just like ByteArrayOutputStream, with one difference. This output stream accepts a + * max capacity hint, which is a hint describing the max amount of data that will be written to this stream. As the + * total size of this stream nears this max, this stream starts to grow linearly instead of exponentially. + * So new slabs are allocated to be 1/5th of the max capacity hint, + * instead of equal to the total previous size of all slabs. This is useful because it prevents allocating roughly + * twice the needed space when a new slab is added just before the stream is done being used. * - * When reusing a buffer it will adjust the slab size based on the previous data size ({@link CapacityByteArrayOutputStream#reset()}) + * When reusing this stream it will adjust the initial slab size based on the previous data size, aiming for fewer + * allocations, with the assumption that a similar amount of data will be written to this stream on re-use. + * See ({@link CapacityByteArrayOutputStream#reset()}). * * @author Julien Le Dem * */ public class CapacityByteArrayOutputStream extends OutputStream { private static final Log LOG = Log.getLog(CapacityByteArrayOutputStream.class); + private static final byte[] EMPTY_SLAB = new byte[0]; - private static final int MINIMUM_SLAB_SIZE = 64 * 1024; - private static final int EXPONENTIAL_SLAB_SIZE_THRESHOLD = 10; + private int initialSlabSize; + private final int maxCapacityHint; + private final List<byte[]> slabs = new ArrayList<byte[]>(); - private int slabSize; - private List<byte[]> slabs = new ArrayList<byte[]>(); private byte[] currentSlab; - private int capacity; private int currentSlabIndex; - private int currentSlabPosition; - private int size; + private int bytesAllocated = 0; + private int bytesUsed = 0; /** - * @param initialSize the initialSize of the buffer (also slab size) + * Return an initial slab size such that a CapacityByteArrayOutputStream constructed with it + * will end up allocating targetNumSlabs in order to reach targetCapacity. This aims to be + * a balance between the overhead of creating new slabs and wasting memory by eagerly making + * initial slabs too big. + * + * Note that targetCapacity here need not match maxCapacityHint in the constructor of + * CapacityByteArrayOutputStream, though often that would make sense. + * + * @param minSlabSize no matter what we shouldn't make slabs any smaller than this + * @param targetCapacity after we've allocated targetNumSlabs how much capacity should we have? + * @param targetNumSlabs how many slabs should it take to reach targetCapacity? */ - public CapacityByteArrayOutputStream(int initialSize) { - Preconditions.checkArgument(initialSize > 0, "initialSize must be > 0"); - initSlabs(initialSize); + public static int initialSlabSizeHeuristic(int minSlabSize, int targetCapacity, int targetNumSlabs) { + // initialSlabSize = (targetCapacity / (2^targetNumSlabs)) means we double targetNumSlabs times + // before reaching the targetCapacity + // eg for page size of 1MB we start at 1024 bytes. + // we also don't want to start too small, so we also apply a minimum. + return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs)))); } - private void initSlabs(int initialSize) { - if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSize)); - this.slabSize = initialSize; - this.slabs.clear(); - this.capacity = initialSize; - this.currentSlab = new byte[slabSize]; - this.slabs.add(currentSlab); - this.currentSlabIndex = 0; - this.currentSlabPosition = 0; - this.size = 0; + /** + * Construct a CapacityByteArrayOutputStream configured such that its initial slab size is + * determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint + */ + public static CapacityByteArrayOutputStream withTargetNumSlabs( + int minSlabSize, int maxCapacityHint, int targetNumSlabs) { + + return new CapacityByteArrayOutputStream( + initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs), + maxCapacityHint); + } + + /** + * Defaults maxCapacityHint to 1MB + * @param initialSlabSize + * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int)} + */ + @Deprecated + public CapacityByteArrayOutputStream(int initialSlabSize) { + this(initialSlabSize, 1024 * 1024); + } + + /** + * @param initialSlabSize the size to make the first slab + * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream + */ + public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) { + checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0"); + checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0"); + checkArgument(maxCapacityHint >= initialSlabSize, String.format("maxCapacityHint can't be less than initialSlabSize %d %d", initialSlabSize, maxCapacityHint)); + this.initialSlabSize = initialSlabSize; + this.maxCapacityHint = maxCapacityHint; + reset(); } + /** + * the new slab is guaranteed to be at least minimumSize + * @param minimumSize the size of the data we want to copy in the new slab + */ private void addSlab(int minimumSize) { - this.currentSlabIndex += 1; - if (currentSlabIndex < this.slabs.size()) { - // reuse existing slab - this.currentSlab = this.slabs.get(currentSlabIndex); - if (Log.DEBUG) LOG.debug(String.format("reusing slab of size %d", currentSlab.length)); - if (currentSlab.length < minimumSize) { - if (Log.DEBUG) LOG.debug(String.format("slab size %,d too small for value of size %,d. replacing slab", currentSlab.length, minimumSize)); - byte[] newSlab = new byte[minimumSize]; - capacity += minimumSize - currentSlab.length; - this.currentSlab = newSlab; - this.slabs.set(currentSlabIndex, newSlab); - } + int nextSlabSize; + + if (bytesUsed == 0) { + nextSlabSize = initialSlabSize; + } else if (bytesUsed > maxCapacityHint / 5) { + // to avoid an overhead of up to twice the needed size, we get linear when approaching target page size + nextSlabSize = maxCapacityHint / 5; } else { - if (currentSlabIndex > EXPONENTIAL_SLAB_SIZE_THRESHOLD) { - // make slabs bigger in case we are creating too many of them - // double slab size every time. - this.slabSize = size; - if (Log.DEBUG) LOG.debug(String.format("used %d slabs, new slab size %d", currentSlabIndex, slabSize)); - } - if (slabSize < minimumSize) { - if (Log.DEBUG) LOG.debug(String.format("slab size %,d too small for value of size %,d. Bumping up slab size", slabSize, minimumSize)); - this.slabSize = minimumSize; - } - if (Log.DEBUG) LOG.debug(String.format("new slab of size %d", slabSize)); - this.currentSlab = new byte[slabSize]; - this.slabs.add(currentSlab); - this.capacity += slabSize; + // double the size every time + nextSlabSize = bytesUsed; + } + + if (nextSlabSize < minimumSize) { + if (Log.DEBUG) LOG.debug(format("slab size %,d too small for value of size %,d. Bumping up slab size", nextSlabSize, minimumSize)); + nextSlabSize = minimumSize; } - this.currentSlabPosition = 0; + + if (Log.DEBUG) LOG.debug(format("used %d slabs, adding new slab of size %d", slabs.size(), nextSlabSize)); + + this.currentSlab = new byte[nextSlabSize]; + this.slabs.add(currentSlab); + this.bytesAllocated += nextSlabSize; + this.currentSlabIndex = 0; } @Override public void write(int b) { - if (currentSlabPosition == currentSlab.length) { + if (currentSlabIndex == currentSlab.length) { addSlab(1); } - currentSlab[currentSlabPosition] = (byte) b; - currentSlabPosition += 1; - size += 1; + currentSlab[currentSlabIndex] = (byte) b; + currentSlabIndex += 1; + bytesUsed += 1; } @Override public void write(byte b[], int off, int len) { if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) { - throw new IndexOutOfBoundsException(); + throw new IndexOutOfBoundsException( + String.format("Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off)); } - if (currentSlabPosition + len >= currentSlab.length) { - final int length1 = currentSlab.length - currentSlabPosition; - System.arraycopy(b, off, currentSlab, currentSlabPosition, length1); + if (currentSlabIndex + len >= currentSlab.length) { + final int length1 = currentSlab.length - currentSlabIndex; + arraycopy(b, off, currentSlab, currentSlabIndex, length1); final int length2 = len - length1; addSlab(length2); - System.arraycopy(b, off + length1, currentSlab, currentSlabPosition, length2); - currentSlabPosition = length2; + arraycopy(b, off + length1, currentSlab, currentSlabIndex, length2); + currentSlabIndex = length2; } else { - System.arraycopy(b, off, currentSlab, currentSlabPosition, len); - currentSlabPosition += len; + arraycopy(b, off, currentSlab, currentSlabIndex, len); + currentSlabIndex += len; } - size += len; + bytesUsed += len; } /** @@ -142,71 +190,52 @@ public void write(byte b[], int off, int len) { * @exception IOException if an I/O error occurs. */ public void writeTo(OutputStream out) throws IOException { - for (int i = 0; i < currentSlabIndex; i++) { + for (int i = 0; i < slabs.size() - 1; i++) { final byte[] slab = slabs.get(i); - out.write(slab, 0, slab.length); + out.write(slab); } - out.write(currentSlab, 0, currentSlabPosition); + out.write(currentSlab, 0, currentSlabIndex); } /** - * @return the size of the allocated buffer + * @return The total size in bytes of data written to this stream. + */ + public long size() { + return bytesUsed; + } + + /** + * + * @return The total size in bytes currently allocated for this stream. */ public int getCapacity() { - return capacity; + return bytesAllocated; } /** * When re-using an instance with reset, it will adjust slab size based on previous data size. * The intent is to reuse the same instance for the same type of data (for example, the same column). - * The assumption is that the size in the buffer will be consistent. Otherwise we fall back to exponentialy double the slab size. - * <ul> - * <li>if we used less than half of the first slab (and it is above the minimum slab size), we will make the slab size smaller. - * <li>if we used more than the slab count threshold (10), we will re-adjust the slab size. - * </ul> - * if re-adjusting the slab size we will make it 1/5th of the previous used size in the buffer so that overhead of extra memory allocation is about 20% - * If we used less than the available slabs we free up the unused ones to reduce memory overhead. + * The assumption is that the size in the buffer will be consistent. */ public void reset() { - // heuristics to adjust slab size - if ( - // if we have only one slab, make sure it is not way too big (more than twice what we need). Except if the slab is already small - (currentSlabIndex == 0 && currentSlabPosition < currentSlab.length / 2 && currentSlab.length > MINIMUM_SLAB_SIZE) - || - // we want to avoid generating too many slabs. - (currentSlabIndex > EXPONENTIAL_SLAB_SIZE_THRESHOLD) - ){ - // readjust slab size - initSlabs(Math.max(size / 5, MINIMUM_SLAB_SIZE)); // should make overhead to about 20% without incurring many slabs - if (Log.DEBUG) LOG.debug(String.format("used %d slabs, new slab size %d", currentSlabIndex + 1, slabSize)); - } else if (currentSlabIndex < slabs.size() - 1) { - // free up the slabs that we are not using. We want to minimize overhead - this.slabs = new ArrayList<byte[]>(slabs.subList(0, currentSlabIndex + 1)); - this.capacity = 0; - for (byte[] slab : slabs) { - capacity += slab.length; - } - } + // readjust slab size. + // 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size + this.initialSlabSize = max(bytesUsed / 7, initialSlabSize); + if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSlabSize)); + this.slabs.clear(); + this.bytesAllocated = 0; + this.bytesUsed = 0; + this.currentSlab = EMPTY_SLAB; this.currentSlabIndex = 0; - this.currentSlabPosition = 0; - this.currentSlab = slabs.get(currentSlabIndex); - this.size = 0; - } - - /** - * @return the size of the buffered data - */ - public long size() { - return size; } /** - * @return the index of the last value being written to this stream, which + * @return the index of the last value written to this stream, which * can be passed to {@link #setByte(long, byte)} in order to change it */ public long getCurrentIndex() { - Preconditions.checkArgument(size > 0, "This is an empty stream"); - return size - 1; + checkArgument(bytesUsed > 0, "This is an empty stream"); + return bytesUsed - 1; } /** @@ -216,11 +245,10 @@ public long getCurrentIndex() { * @param value the value to replace it with */ public void setByte(long index, byte value) { - Preconditions.checkArgument(index < size, - "Index: " + index + " is >= the current size of: " + size); + checkArgument(index < bytesUsed, "Index: " + index + " is >= the current size of: " + bytesUsed); long seen = 0; - for (int i = 0; i <=currentSlabIndex; i++) { + for (int i = 0; i < slabs.size(); i++) { byte[] slab = slabs.get(i); if (index < seen + slab.length) { // ok found index @@ -236,11 +264,11 @@ public void setByte(long index, byte value) { * @return a text representation of the memory usage of this structure */ public String memUsageString(String prefix) { - return String.format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), getCapacity()); + return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), getCapacity()); } /** - * @return the total count of allocated slabs + * @return the total number of allocated slabs */ int getSlabCount() { return slabs.size(); diff --git a/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java b/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java new file mode 100644 index 0000000000..6a8437b934 --- /dev/null +++ b/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package parquet.bytes; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import static java.lang.String.format; + +public class ConcatenatingByteArrayCollector extends BytesInput { + private final List<byte[]> slabs = new ArrayList<byte[]>(); + private long size = 0; + + public void collect(BytesInput bytesInput) throws IOException { + byte[] bytes = bytesInput.toByteArray(); + slabs.add(bytes); + size += bytes.length; + } + + public void reset() { + size = 0; + slabs.clear(); + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + for (byte[] slab : slabs) { + out.write(slab); + } + } + + @Override + public long size() { + return size; + } + + /** + * @param prefix a prefix to be used for every new line in the string + * @return a text representation of the memory usage of this structure + */ + public String memUsageString(String prefix) { + return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), size); + } + +} diff --git a/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java b/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java index 72b52f5dfb..41006f1480 100644 --- a/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/test/java/parquet/bytes/TestCapacityByteArrayOutputStream.java @@ -32,7 +32,7 @@ public class TestCapacityByteArrayOutputStream { @Test public void testWrite() throws Throwable { - CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10); + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10); final int expectedSize = 54; for (int i = 0; i < expectedSize; i++) { capacityByteArrayOutputStream.write(i); @@ -43,7 +43,7 @@ public void testWrite() throws Throwable { @Test public void testWriteArray() throws Throwable { - CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10); + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10); int v = 23; writeArraysOf3(capacityByteArrayOutputStream, v); validate(capacityByteArrayOutputStream, v * 3); @@ -51,7 +51,7 @@ public void testWriteArray() throws Throwable { @Test public void testWriteArrayAndInt() throws Throwable { - CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10); + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10); for (int i = 0; i < 23; i++) { byte[] toWrite = { (byte)(i * 3), (byte)(i * 3 + 1)}; capacityByteArrayOutputStream.write(toWrite); @@ -62,9 +62,13 @@ public void testWriteArrayAndInt() throws Throwable { } + protected CapacityByteArrayOutputStream newCapacityBAOS(int initialSize) { + return new CapacityByteArrayOutputStream(10, 1000000); + } + @Test public void testReset() throws Throwable { - CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10); + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10); for (int i = 0; i < 54; i++) { capacityByteArrayOutputStream.write(i); assertEquals(i + 1, capacityByteArrayOutputStream.size()); @@ -83,7 +87,7 @@ public void testReset() throws Throwable { @Test public void testWriteArrayBiggerThanSlab() throws Throwable { - CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10); + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10); int v = 23; writeArraysOf3(capacityByteArrayOutputStream, v); int n = v * 3; @@ -109,7 +113,7 @@ public void testWriteArrayBiggerThanSlab() throws Throwable { @Test public void testWriteArrayManySlabs() throws Throwable { - CapacityByteArrayOutputStream capacityByteArrayOutputStream = new CapacityByteArrayOutputStream(10); + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10); int it = 500; int v = 23; for (int j = 0; j < it; j++) { @@ -137,7 +141,7 @@ public void testWriteArrayManySlabs() throws Throwable { public void testReplaceByte() throws Throwable { // test replace the first value { - CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5); + CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5); cbaos.write(10); assertEquals(0, cbaos.getCurrentIndex()); cbaos.setByte(0, (byte) 7); @@ -148,7 +152,7 @@ public void testReplaceByte() throws Throwable { // test replace value in the first slab { - CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5); + CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5); cbaos.write(10); cbaos.write(13); cbaos.write(15); @@ -163,7 +167,7 @@ public void testReplaceByte() throws Throwable { // test replace in *not* the first slab { - CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5); + CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5); // advance part way through the 3rd slab for (int i = 0; i < 12; i++) { @@ -181,7 +185,7 @@ public void testReplaceByte() throws Throwable { // test replace last value of a slab { - CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5); + CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5); // advance part way through the 3rd slab for (int i = 0; i < 12; i++) { @@ -199,7 +203,7 @@ public void testReplaceByte() throws Throwable { // test replace last value { - CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(5); + CapacityByteArrayOutputStream cbaos = newCapacityBAOS(5); // advance part way through the 3rd slab for (int i = 0; i < 12; i++) { diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java index b3cdd656da..e3bab0d2ab 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,7 +19,9 @@ package parquet.hadoop; import static parquet.Log.INFO; +import static parquet.column.statistics.Statistics.getStatsBasedOnType; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -29,14 +31,13 @@ import parquet.Log; import parquet.bytes.BytesInput; -import parquet.bytes.CapacityByteArrayOutputStream; +import parquet.bytes.ConcatenatingByteArrayCollector; import parquet.column.ColumnDescriptor; import parquet.column.Encoding; import parquet.column.page.DictionaryPage; import parquet.column.page.PageWriteStore; import parquet.column.page.PageWriter; import parquet.column.statistics.Statistics; -import parquet.format.ColumnChunk; import parquet.format.converter.ParquetMetadataConverter; import parquet.hadoop.CodecFactory.BytesCompressor; import parquet.io.ParquetEncodingException; @@ -52,7 +53,8 @@ private static final class ColumnChunkPageWriter implements PageWriter { private final ColumnDescriptor path; private final BytesCompressor compressor; - private final CapacityByteArrayOutputStream buf; + private final ByteArrayOutputStream tempOutputStream = new ByteArrayOutputStream(); + private final ConcatenatingByteArrayCollector buf; private DictionaryPage dictionaryPage; private long uncompressedLength; @@ -64,11 +66,11 @@ private static final class ColumnChunkPageWriter implements PageWriter { private Statistics totalStatistics; - private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) { + private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int pageSize) { this.path = path; this.compressor = compressor; - this.buf = new CapacityByteArrayOutputStream(initialSize); - this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType()); + this.buf = new ConcatenatingByteArrayCollector(); + this.totalStatistics = getStatsBasedOnType(this.path.getType()); } @Override @@ -91,6 +93,7 @@ public void writePage(BytesInput bytes, "Cannot write compressed page larger than Integer.MAX_VALUE bytes: " + compressedSize); } + tempOutputStream.reset(); parquetMetadataConverter.writeDataPageHeader( (int)uncompressedSize, (int)compressedSize, @@ -99,13 +102,15 @@ public void writePage(BytesInput bytes, rlEncoding, dlEncoding, valuesEncoding, - buf); + tempOutputStream); this.uncompressedLength += uncompressedSize; this.compressedLength += compressedSize; this.totalValueCount += valueCount; this.pageCount += 1; this.totalStatistics.mergeStatistics(statistics); - compressedBytes.writeAllTo(buf); + // by concatenating before collecting instead of collecting twice, + // we only allocate one buffer to copy into instead of multiple. + buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes)); encodings.add(rlEncoding); encodings.add(dlEncoding); encodings.add(valuesEncoding); @@ -127,21 +132,30 @@ public void writePageV2( int compressedSize = toIntWithCheck( compressedData.size() + repetitionLevels.size() + definitionLevels.size() ); + tempOutputStream.reset(); parquetMetadataConverter.writeDataPageV2Header( uncompressedSize, compressedSize, valueCount, nullCount, rowCount, statistics, dataEncoding, - rlByteLength, dlByteLength, - buf); + rlByteLength, + dlByteLength, + tempOutputStream); this.uncompressedLength += uncompressedSize; this.compressedLength += compressedSize; this.totalValueCount += valueCount; this.pageCount += 1; this.totalStatistics.mergeStatistics(statistics); - repetitionLevels.writeAllTo(buf); - definitionLevels.writeAllTo(buf); - compressedData.writeAllTo(buf); + + // by concatenating before collecting instead of collecting twice, + // we only allocate one buffer to copy into instead of multiple. + buf.collect( + BytesInput.concat( + BytesInput.from(tempOutputStream), + repetitionLevels, + definitionLevels, + compressedData) + ); encodings.add(dataEncoding); } @@ -165,7 +179,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException { writer.writeDictionaryPage(dictionaryPage); encodings.add(dictionaryPage.getEncoding()); } - writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings)); + writer.writeDataPages(buf, uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings)); writer.endColumn(); if (INFO) { LOG.info( @@ -183,7 +197,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException { @Override public long allocatedSize() { - return buf.getCapacity(); + return buf.size(); } @Override @@ -206,10 +220,10 @@ public String memUsageString(String prefix) { private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>(); private final MessageType schema; - public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) { + public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) { this.schema = schema; for (ColumnDescriptor path : schema.getColumns()) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize)); + writers.put(path, new ColumnChunkPageWriter(path, compressor, pageSize)); } } diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java index 22a4e58cd2..66f2d200dd 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java @@ -43,7 +43,6 @@ class InternalParquetRecordWriter<T> { private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class); - private static final int MINIMUM_BUFFER_SIZE = 64 * 1024; private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; @@ -99,22 +98,11 @@ public InternalParquetRecordWriter( } private void initStore() { - // we don't want this number to be too small - // ideally we divide the block equally across the columns - // it is unlikely all columns are going to be the same size. - // its value is likely below Integer.MAX_VALUE (2GB), although rowGroupSize is a long type. - // therefore this size is cast to int, since allocating byte array in under layer needs to - // limit the array size in an int scope. - int initialBlockBufferSize = Ints.checkedCast(max(MINIMUM_BUFFER_SIZE, rowGroupSize / schema.getColumns().size() / 5)); - pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize); - // we don't want this number to be too small either - // ideally, slightly bigger than the page size, but not bigger than the block buffer - int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); + pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize); columnStore = parquetProperties.newColumnWriteStore( schema, pageStore, - pageSize, - initialPageBufferSize); + pageSize); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore)); } diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java index b1ec02e57e..28f6be2198 100644 --- a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.verify; import static parquet.column.Encoding.PLAIN; import static parquet.column.Encoding.RLE; import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER; @@ -44,11 +43,9 @@ import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Test; - import org.mockito.InOrder; import org.mockito.Mockito; -import org.mockito.internal.verification.VerificationModeFactory; -import org.mockito.verification.VerificationMode; + import parquet.bytes.BytesInput; import parquet.bytes.LittleEndianDataInputStream; import parquet.column.ColumnDescriptor; @@ -63,8 +60,6 @@ import parquet.hadoop.metadata.ParquetMetadata; import parquet.schema.MessageType; import parquet.schema.MessageTypeParser; -import parquet.schema.OriginalType; -import parquet.schema.PrimitiveType; import parquet.schema.Types; public class TestColumnChunkPageWriteStore { diff --git a/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java b/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java index fc679b5b9f..a33790bd3a 100644 --- a/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java +++ b/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java @@ -59,7 +59,7 @@ public static void main(String[] args) throws Exception { MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchema)); MemPageStore memPageStore = new MemPageStore(0); - ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); + ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); write(memPageStore, columns, schema, pigSchema); columns.flush(); read(memPageStore, pigSchema, pigSchemaProjected, pigSchemaNoString); diff --git a/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java b/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java index 1083b0cead..e5edb37d1e 100644 --- a/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java +++ b/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java @@ -148,7 +148,7 @@ private <T extends TBase<?,?>> void validate(T expected) throws TException { final MessageType schema = schemaConverter.convert(thriftClass); LOG.info(schema); final MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); - final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, 10000, false, WriterVersion.PARQUET_1_0); + final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, false, WriterVersion.PARQUET_1_0); final RecordConsumer recordWriter = columnIO.getRecordWriter(columns); final StructType thriftType = schemaConverter.toStructType(thriftClass); ParquetWriteProtocol parquetWriteProtocol = new ParquetWriteProtocol(recordWriter, columnIO, thriftType);