Skip to content

Commit

Permalink
PARQUET-160: avoid wasting 64K per empty buffer.
Browse files Browse the repository at this point in the history
This buffer initializes itself to a default size when instantiated.
This leads to a lot of unused small buffers when there are a lot of empty columns.

Author: Alex Levenson <[email protected]>
Author: julien <[email protected]>
Author: Julien Le Dem <[email protected]>

Closes #98 from julienledem/avoid_wasting_64K_per_empty_buffer and squashes the following commits:

b0200dd [julien] add license
a1b278e [julien] Merge branch 'master' into avoid_wasting_64K_per_empty_buffer
5304ee1 [julien] remove unused constant
81e399f [julien] Merge branch 'avoid_wasting_64K_per_empty_buffer' of github.com:julienledem/incubator-parquet-mr into avoid_wasting_64K_per_empty_buffer
ccf677d [julien] Merge branch 'master' into avoid_wasting_64K_per_empty_buffer
37148d6 [Julien Le Dem] Merge pull request #2 from isnotinvain/PR-98
b9abab0 [Alex Levenson] Address Julien's comment
965af7f [Alex Levenson] one more typo
9939d8d [Alex Levenson] fix typos in comments
61c0100 [Alex Levenson] Make initial slab size heuristic into a helper method, apply in DictionaryValuesWriter as well
a257ee4 [Alex Levenson] Improve IndexOutOfBoundsException message
64d6c7f [Alex Levenson] update comments
8b54667 [Alex Levenson] Don't use CapacityByteArrayOutputStream for writing page chunks
6a20e8b [Alex Levenson] Remove initialSlabSize decision from InternalParquetRecordReader, use a simpler heuristic in the column writers instead
3a0f8e4 [Alex Levenson] Use simpler settings for column chunk writer
b2736a1 [Alex Levenson] Some cleanup in CapacityByteArrayOutputStream
1df4a71 [julien] refactor CapacityByteArray to be aware of page size
95c8fb6 [julien] avoid wasting 64K per empty buffer.
  • Loading branch information
isnotinvain committed Mar 5, 2015
1 parent fa8957d commit d084ad2
Show file tree
Hide file tree
Showing 44 changed files with 430 additions and 318 deletions.
47 changes: 24 additions & 23 deletions parquet-column/src/main/java/parquet/column/ParquetProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,29 +81,29 @@ public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean
this.enableDictionary = enableDict;
}

public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol) {
public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
if (maxLevel == 0) {
return new DevNullValuesWriter();
} else {
return new RunLengthBitPackingHybridValuesWriter(
getWidthFromMaxInt(maxLevel), initialSizePerCol);
getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize);
}
}

private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol) {
private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
switch (path.getType()) {
case BOOLEAN:
return new BooleanPlainValuesWriter();
case INT96:
return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol);
return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize);
case FIXED_LEN_BYTE_ARRAY:
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol);
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize);
case BINARY:
case INT32:
case INT64:
case DOUBLE:
case FLOAT:
return new PlainValuesWriter(initialSizePerCol);
return new PlainValuesWriter(initialSizePerCol, pageSize);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand Down Expand Up @@ -146,24 +146,24 @@ private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initi
}
}

private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol) {
private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
switch(writerVersion) {
case PARQUET_1_0:
return plainWriter(path, initialSizePerCol);
return plainWriter(path, initialSizePerCol, pageSize);
case PARQUET_2_0:
switch (path.getType()) {
case BOOLEAN:
return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol);
return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize);
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return new DeltaByteArrayWriter(initialSizePerCol);
return new DeltaByteArrayWriter(initialSizePerCol, pageSize);
case INT32:
return new DeltaBinaryPackingValuesWriter(initialSizePerCol);
return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize);
case INT96:
case INT64:
case DOUBLE:
case FLOAT:
return plainWriter(path, initialSizePerCol);
return plainWriter(path, initialSizePerCol, pageSize);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand All @@ -172,8 +172,8 @@ private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePe
}
}

private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol) {
ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol);
private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol, pageSize);
if (enableDictionary) {
return FallbackValuesWriter.of(
dictionaryWriter(path, initialSizePerCol),
Expand All @@ -183,24 +183,24 @@ private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSi
}
}

public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol) {
public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
switch (path.getType()) {
case BOOLEAN: // no dictionary encoding for boolean
return writerToFallbackTo(path, initialSizePerCol);
return writerToFallbackTo(path, initialSizePerCol, pageSize);
case FIXED_LEN_BYTE_ARRAY:
// dictionary encoding for that type was not enabled in PARQUET 1.0
if (writerVersion == WriterVersion.PARQUET_2_0) {
return dictWriterWithFallBack(path, initialSizePerCol);
return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
} else {
return writerToFallbackTo(path, initialSizePerCol);
return writerToFallbackTo(path, initialSizePerCol, pageSize);
}
case BINARY:
case INT32:
case INT64:
case INT96:
case DOUBLE:
case FLOAT:
return dictWriterWithFallBack(path, initialSizePerCol);
return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand All @@ -220,19 +220,20 @@ public boolean isEnableDictionary() {

public ColumnWriteStore newColumnWriteStore(
MessageType schema,
PageWriteStore pageStore, int pageSize,
int initialPageBufferSize) {
PageWriteStore pageStore,
int pageSize) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(
pageStore,
pageSize, initialPageBufferSize, dictionaryPageSizeThreshold,
pageSize,
dictionaryPageSizeThreshold,
enableDictionary, writerVersion);
case PARQUET_2_0:
return new ColumnWriteStoreV2(
schema,
pageStore,
pageSize, initialPageBufferSize,
pageSize,
new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore {
private final int pageSizeThreshold;
private final int dictionaryPageSizeThreshold;
private final boolean enableDictionary;
private final int initialSizePerCol;
private final WriterVersion writerVersion;

public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
super();
this.pageWriteStore = pageWriteStore;
this.pageSizeThreshold = pageSizeThreshold;
this.initialSizePerCol = initialSizePerCol;
this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
this.enableDictionary = enableDictionary;
this.writerVersion = writerVersion;
Expand All @@ -67,7 +65,7 @@ public Set<ColumnDescriptor> getColumnDescriptors() {

private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, initialSizePerCol, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
public ColumnWriteStoreV2(
MessageType schema,
PageWriteStore pageWriteStore,
int pageSizeThreshold, int initialSizePerCol,
int pageSizeThreshold,
ParquetProperties parquetProps) {
super();
this.pageSizeThreshold = pageSizeThreshold;
this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps));
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold));
}
this.columns = unmodifiableMap(mcolumns);
this.writers = this.columns.values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;

import parquet.Log;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnWriter;
import parquet.column.ParquetProperties;
Expand All @@ -34,6 +35,9 @@
import parquet.io.ParquetEncodingException;
import parquet.io.api.Binary;

import static java.lang.Math.max;
import static java.lang.Math.pow;

/**
* Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
*
Expand All @@ -44,6 +48,7 @@ final class ColumnWriterV1 implements ColumnWriter {
private static final Log LOG = Log.getLog(ColumnWriterV1.class);
private static final boolean DEBUG = Log.DEBUG;
private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
private static final int MIN_SLAB_SIZE = 64;

private final ColumnDescriptor path;
private final PageWriter pageWriter;
Expand All @@ -60,7 +65,6 @@ public ColumnWriterV1(
ColumnDescriptor path,
PageWriter pageWriter,
int pageSizeThreshold,
int initialSizePerCol,
int dictionaryPageSizeThreshold,
boolean enableDictionary,
WriterVersion writerVersion) {
Expand All @@ -72,9 +76,12 @@ public ColumnWriterV1(
resetStatistics();

ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);

this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);

int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
}

private void log(Object value, int r, int d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package parquet.column.impl;

import static java.lang.Math.max;
import static java.lang.Math.pow;
import static parquet.bytes.BytesUtils.getWidthFromMaxInt;

import java.io.IOException;

import parquet.Ints;
import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnWriter;
import parquet.column.Encoding;
Expand All @@ -46,6 +49,7 @@
final class ColumnWriterV2 implements ColumnWriter {
private static final Log LOG = Log.getLog(ColumnWriterV2.class);
private static final boolean DEBUG = Log.DEBUG;
private static final int MIN_SLAB_SIZE = 64;

private final ColumnDescriptor path;
private final PageWriter pageWriter;
Expand All @@ -60,14 +64,17 @@ final class ColumnWriterV2 implements ColumnWriter {
public ColumnWriterV2(
ColumnDescriptor path,
PageWriter pageWriter,
int initialSizePerCol,
ParquetProperties parquetProps) {
ParquetProperties parquetProps,
int pageSize) {
this.path = path;
this.pageWriter = pageWriter;
resetStatistics();
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);

this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);

int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10);
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
}

private void log(Object value, int r, int d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ public class BitPackingValuesWriter extends ValuesWriter {

/**
* @param bound the maximum value stored by this column
* @param pageSize
*/
public BitPackingValuesWriter(int bound, int initialCapacity) {
public BitPackingValuesWriter(int bound, int initialCapacity, int pageSize) {
this.bitsPerValue = getWidthFromMaxInt(bound);
this.out = new CapacityByteArrayOutputStream(initialCapacity);
this.out = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
init();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class BitWriter {
}
}

public BitWriter(int initialCapacity) {
this.baos = new CapacityByteArrayOutputStream(initialCapacity);
public BitWriter(int initialCapacity, int pageSize) {
this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
}

public void writeBit(boolean bit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static ValuesReader getBoundedReader(int bound) {
return bound == 0 ? new ZeroIntegerValuesReader() : new BoundedIntValuesReader(bound);
}

public static ValuesWriter getBoundedWriter(int bound, int initialCapacity) {
return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity);
public static ValuesWriter getBoundedWriter(int bound, int initialCapacity, int pageSize) {
return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity, pageSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ class BoundedIntValuesWriter extends ValuesWriter {
}
}

public BoundedIntValuesWriter(int bound, int initialCapacity) {
public BoundedIntValuesWriter(int bound, int initialCapacity, int pageSize) {
if (bound == 0) {
throw new ParquetEncodingException("Value bound cannot be 0. Use DevNullColumnWriter instead.");
}
this.bitWriter = new BitWriter(initialCapacity);
this.bitWriter = new BitWriter(initialCapacity, pageSize);
bitsPerValue = (int)Math.ceil(Math.log(bound + 1)/Math.log(2));
shouldRepeatThreshold = (bitsPerValue + 9)/(1 + bitsPerValue);
if (Log.DEBUG) LOG.debug("init column with bit width of " + bitsPerValue + " and repeat threshold of " + shouldRepeatThreshold);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
* reused between flushes.
*/
public static final int MAX_BITWIDTH = 32;

public static final int DEFAULT_NUM_BLOCK_VALUES = 128;

public static final int DEFAULT_NUM_MINIBLOCKS = 4;

private final CapacityByteArrayOutputStream baos;
Expand Down Expand Up @@ -110,17 +110,17 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
* it will be reset after each flush
*/
private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
public DeltaBinaryPackingValuesWriter(int slabSize) {
this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize);

public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize) {
this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize);
}

public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize) {
public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize) {
this.config = new DeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum);
bitWidths = new int[config.miniBlockNumInABlock];
deltaBlockBuffer = new int[blockSizeInValues];
miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
baos = new CapacityByteArrayOutputStream(slabSize);
baos = new CapacityByteArrayOutputStream(slabSize, pageSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* <pre>
* {@code
* delta-length-byte-array : length* byte-array*
* }
* }
* </pre>
* @author Aniket Mokashi
*
Expand All @@ -48,13 +48,13 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
private CapacityByteArrayOutputStream arrayOut;
private LittleEndianDataOutputStream out;

public DeltaLengthByteArrayValuesWriter(int initialSize) {
arrayOut = new CapacityByteArrayOutputStream(initialSize);
public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize) {
arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
out = new LittleEndianDataOutputStream(arrayOut);
lengthWriter = new DeltaBinaryPackingValuesWriter(
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES,
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS,
initialSize);
initialSize, pageSize);
}

@Override
Expand Down Expand Up @@ -101,6 +101,6 @@ public long getAllocatedSize() {

@Override
public String memUsageString(String prefix) {
return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY");
return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY");
}
}
Loading

0 comments on commit d084ad2

Please sign in to comment.