Skip to content

Commit

Permalink
Make initial slab size heuristic into a helper method, apply in Dicti…
Browse files Browse the repository at this point in the history
…onaryValuesWriter as well
  • Loading branch information
isnotinvain committed Feb 23, 2015
1 parent a257ee4 commit 61c0100
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;

import parquet.Log;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnWriter;
import parquet.column.ParquetProperties;
Expand Down Expand Up @@ -76,10 +77,7 @@ public ColumnWriterV1(
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);

// initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize
// eg for page size of 1MB we start at 1024 bytes.
// we also don't want to start too small, so we also apply a minimum.
int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSizeThreshold / pow(2, 10))));
int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import parquet.Ints;
import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnWriter;
import parquet.column.Encoding;
Expand Down Expand Up @@ -69,10 +70,7 @@ public ColumnWriterV2(
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);

// initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize
// eg for page size of 1MB we start at 1024 bytes.
// we also don't want to start too small, so we also apply a minimum.
int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSize / pow(2, 10))));
int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10);
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.bytes.BytesUtils;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.column.Encoding;
import parquet.column.page.DictionaryPage;
import parquet.column.values.RequiresFallback;
Expand All @@ -62,6 +63,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req

/* max entries allowed for the dictionary will fail over to plain encoding if reached */
private static final int MAX_DICTIONARY_ENTRIES = Integer.MAX_VALUE - 1;
private static final int MIN_INITIAL_SLAB_SIZE = 64;

/* encoding to label the data page */
private final Encoding encodingForDataPage;
Expand Down Expand Up @@ -142,8 +144,12 @@ public BytesInput getBytes() {
int maxDicId = getDictionarySize() - 1;
if (DEBUG) LOG.debug("max dic id " + maxDicId);
int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
// TODO: what is a good initialCapacity?
RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64, maxDictionaryByteSize);

int initialSlabSize =
CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10);

RunLengthBitPackingHybridEncoder encoder =
new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize);
IntIterator iterator = encodedValues.iterator();
try {
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package parquet.bytes;

import static java.lang.Math.max;
import static java.lang.Math.pow;
import static java.lang.String.format;
import static java.lang.System.arraycopy;
import static parquet.Preconditions.checkArgument;
Expand Down Expand Up @@ -61,6 +62,39 @@ public class CapacityByteArrayOutputStream extends OutputStream {
private int bytesAllocated = 0;
private int bytesUsed = 0;

/**
* Return an initial slab size such that a CapacityByteArrayOutputStream constructed with it
* will end up allocating targetNumSlabs in order to reach targetCapacity. This aims to be
* a balance between the overhead of creating new slabs and wasting memory by eagerly making
* initial slabs too big.
*
* Note that targetCapacity here need not match maxCapacityHint in the constructor of
* CapacityByteArrayOutputStream, though often that would make sense.
*
* @param minSlabSize no matter what we shouldn't make slabs any smaller than this
* @param targetCapacity after we've allocated targetNumSlabs how much capacity should we have?
* @param targetNumSlabs how many slabs should it take to reach targetTotalSize?
*/
public static int initialSlabSizeHeuristic(int minSlabSize, int targetCapacity, int targetNumSlabs) {
// initialSlabSize = (targetCapacity / (2^targetNumSlabs)) means we double targetNumSlabs times
// before reaching the targetCapacity
// eg for page size of 1MB we start at 1024 bytes.
// we also don't want to start too small, so we also apply a minimum.
return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs))));
}

/**
* Construct a CapacityByteArrayOutputStream configured such that it's initial slab size is
* determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint
*/
public static CapacityByteArrayOutputStream withTargetNumSlabs(
int minSlabSize, int maxCapacityHint, int targetNumSlabs) {

return new CapacityByteArrayOutputStream(
initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs),
maxCapacityHint);
}

/**
* Defaults maxCapacityHint to 1MB
* @param initialSlabSize
Expand Down

0 comments on commit 61c0100

Please sign in to comment.