From 61c01002800a1160ef5b3979659b46a8a473a711 Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Mon, 23 Feb 2015 13:58:02 -0800 Subject: [PATCH] Make initial slab size heuristic into a helper method, apply in DictionaryValuesWriter as well --- .../parquet/column/impl/ColumnWriterV1.java | 6 ++-- .../parquet/column/impl/ColumnWriterV2.java | 6 ++-- .../dictionary/DictionaryValuesWriter.java | 10 ++++-- .../bytes/CapacityByteArrayOutputStream.java | 34 +++++++++++++++++++ 4 files changed, 46 insertions(+), 10 deletions(-) diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java index f3d8181369..fdca2f5502 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java @@ -20,6 +20,7 @@ import java.io.IOException; import parquet.Log; +import parquet.bytes.CapacityByteArrayOutputStream; import parquet.column.ColumnDescriptor; import parquet.column.ColumnWriter; import parquet.column.ParquetProperties; @@ -76,10 +77,7 @@ public ColumnWriterV1( this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold); this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold); - // initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize - // eg for page size of 1MB we start at 1024 bytes. - // we also don't want to start too small, so we also apply a minimum. - int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSizeThreshold / pow(2, 10)))); + int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10); this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold); } diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java index e29a786cbc..df1075a499 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java @@ -24,6 +24,7 @@ import parquet.Ints; import parquet.Log; import parquet.bytes.BytesInput; +import parquet.bytes.CapacityByteArrayOutputStream; import parquet.column.ColumnDescriptor; import parquet.column.ColumnWriter; import parquet.column.Encoding; @@ -69,10 +70,7 @@ public ColumnWriterV2( this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize); this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize); - // initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize - // eg for page size of 1MB we start at 1024 bytes. - // we also don't want to start too small, so we also apply a minimum. - int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSize / pow(2, 10)))); + int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10); this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize); } diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java index 25718117c0..8fca4fd613 100644 --- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java @@ -39,6 +39,7 @@ import parquet.Log; import parquet.bytes.BytesInput; import parquet.bytes.BytesUtils; +import parquet.bytes.CapacityByteArrayOutputStream; import parquet.column.Encoding; import parquet.column.page.DictionaryPage; import parquet.column.values.RequiresFallback; @@ -62,6 +63,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req /* max entries allowed for the dictionary will fail over to plain encoding if reached */ private static final int MAX_DICTIONARY_ENTRIES = Integer.MAX_VALUE - 1; + private static final int MIN_INITIAL_SLAB_SIZE = 64; /* encoding to label the data page */ private final Encoding encodingForDataPage; @@ -142,8 +144,12 @@ public BytesInput getBytes() { int maxDicId = getDictionarySize() - 1; if (DEBUG) LOG.debug("max dic id " + maxDicId); int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId); - // TODO: what is a good initialCapacity? - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64, maxDictionaryByteSize); + + int initialSlabSize = + CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10); + + RunLengthBitPackingHybridEncoder encoder = + new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize); IntIterator iterator = encodedValues.iterator(); try { while (iterator.hasNext()) { diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index b0744fca91..70100efb0a 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -16,6 +16,7 @@ package parquet.bytes; import static java.lang.Math.max; +import static java.lang.Math.pow; import static java.lang.String.format; import static java.lang.System.arraycopy; import static parquet.Preconditions.checkArgument; @@ -61,6 +62,39 @@ public class CapacityByteArrayOutputStream extends OutputStream { private int bytesAllocated = 0; private int bytesUsed = 0; + /** + * Return an initial slab size such that a CapacityByteArrayOutputStream constructed with it + * will end up allocating targetNumSlabs in order to reach targetCapacity. This aims to be + * a balance between the overhead of creating new slabs and wasting memory by eagerly making + * initial slabs too big. + * + * Note that targetCapacity here need not match maxCapacityHint in the constructor of + * CapacityByteArrayOutputStream, though often that would make sense. + * + * @param minSlabSize no matter what we shouldn't make slabs any smaller than this + * @param targetCapacity after we've allocated targetNumSlabs how much capacity should we have? + * @param targetNumSlabs how many slabs should it take to reach targetTotalSize? + */ + public static int initialSlabSizeHeuristic(int minSlabSize, int targetCapacity, int targetNumSlabs) { + // initialSlabSize = (targetCapacity / (2^targetNumSlabs)) means we double targetNumSlabs times + // before reaching the targetCapacity + // eg for page size of 1MB we start at 1024 bytes. + // we also don't want to start too small, so we also apply a minimum. + return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs)))); + } + + /** + * Construct a CapacityByteArrayOutputStream configured such that it's initial slab size is + * determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint + */ + public static CapacityByteArrayOutputStream withTargetNumSlabs( + int minSlabSize, int maxCapacityHint, int targetNumSlabs) { + + return new CapacityByteArrayOutputStream( + initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs), + maxCapacityHint); + } + /** * Defaults maxCapacityHint to 1MB * @param initialSlabSize