Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-160: avoid wasting 64K per empty buffer. #98

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions parquet-column/src/main/java/parquet/column/ParquetProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,29 @@ public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean
this.enableDictionary = enableDict;
}

public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol) {
public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
if (maxLevel == 0) {
return new DevNullValuesWriter();
} else {
return new RunLengthBitPackingHybridValuesWriter(
getWidthFromMaxInt(maxLevel), initialSizePerCol);
getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize);
}
}

private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol) {
private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
switch (path.getType()) {
case BOOLEAN:
return new BooleanPlainValuesWriter();
case INT96:
return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol);
return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize);
case FIXED_LEN_BYTE_ARRAY:
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol);
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize);
case BINARY:
case INT32:
case INT64:
case DOUBLE:
case FLOAT:
return new PlainValuesWriter(initialSizePerCol);
return new PlainValuesWriter(initialSizePerCol, pageSize);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand Down Expand Up @@ -128,24 +128,24 @@ private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initi
}
}

private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol) {
private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
switch(writerVersion) {
case PARQUET_1_0:
return plainWriter(path, initialSizePerCol);
return plainWriter(path, initialSizePerCol, pageSize);
case PARQUET_2_0:
switch (path.getType()) {
case BOOLEAN:
return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol);
return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize);
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return new DeltaByteArrayWriter(initialSizePerCol);
return new DeltaByteArrayWriter(initialSizePerCol, pageSize);
case INT32:
return new DeltaBinaryPackingValuesWriter(initialSizePerCol);
return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize);
case INT96:
case INT64:
case DOUBLE:
case FLOAT:
return plainWriter(path, initialSizePerCol);
return plainWriter(path, initialSizePerCol, pageSize);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand All @@ -154,8 +154,8 @@ private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePe
}
}

private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol) {
ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol);
private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol, pageSize);
if (enableDictionary) {
return FallbackValuesWriter.of(
dictionaryWriter(path, initialSizePerCol),
Expand All @@ -165,24 +165,24 @@ private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSi
}
}

public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol) {
public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
switch (path.getType()) {
case BOOLEAN: // no dictionary encoding for boolean
return writerToFallbackTo(path, initialSizePerCol);
return writerToFallbackTo(path, initialSizePerCol, pageSize);
case FIXED_LEN_BYTE_ARRAY:
// dictionary encoding for that type was not enabled in PARQUET 1.0
if (writerVersion == WriterVersion.PARQUET_2_0) {
return dictWriterWithFallBack(path, initialSizePerCol);
return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
} else {
return writerToFallbackTo(path, initialSizePerCol);
return writerToFallbackTo(path, initialSizePerCol, pageSize);
}
case BINARY:
case INT32:
case INT64:
case INT96:
case DOUBLE:
case FLOAT:
return dictWriterWithFallBack(path, initialSizePerCol);
return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ColumnWriteStoreV2(
Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps));
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps, pageSizeThreshold));
}
this.columns = unmodifiableMap(mcolumns);
this.writers = this.columns.values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ public ColumnWriterV1(
resetStatistics();

ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol, pageSizeThreshold);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol, pageSizeThreshold);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSizeThreshold);
}

private void log(Object value, int r, int d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ public ColumnWriterV2(
ColumnDescriptor path,
PageWriter pageWriter,
int initialSizePerCol,
ParquetProperties parquetProps) {
ParquetProperties parquetProps,
int pageSize) {
this.path = path;
this.pageWriter = pageWriter;
resetStatistics();
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol, pageSize);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol, pageSize);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should tweak the initialSize here.
levels should get a tiny initial size (100 bytes?) in case they are always null or always defined.

this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSize);
}

private void log(Object value, int r, int d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ public class BitPackingValuesWriter extends ValuesWriter {

/**
* @param bound the maximum value stored by this column
* @param pageSize
*/
public BitPackingValuesWriter(int bound, int initialCapacity) {
public BitPackingValuesWriter(int bound, int initialCapacity, int pageSize) {
this.bitsPerValue = getWidthFromMaxInt(bound);
this.out = new CapacityByteArrayOutputStream(initialCapacity);
this.out = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
init();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class BitWriter {
}
}

public BitWriter(int initialCapacity) {
this.baos = new CapacityByteArrayOutputStream(initialCapacity);
public BitWriter(int initialCapacity, int pageSize) {
this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
}

public void writeBit(boolean bit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static ValuesReader getBoundedReader(int bound) {
return bound == 0 ? new ZeroIntegerValuesReader() : new BoundedIntValuesReader(bound);
}

public static ValuesWriter getBoundedWriter(int bound, int initialCapacity) {
return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity);
public static ValuesWriter getBoundedWriter(int bound, int initialCapacity, int pageSize) {
return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity, pageSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ class BoundedIntValuesWriter extends ValuesWriter {
}
}

public BoundedIntValuesWriter(int bound, int initialCapacity) {
public BoundedIntValuesWriter(int bound, int initialCapacity, int pageSize) {
if (bound == 0) {
throw new ParquetEncodingException("Value bound cannot be 0. Use DevNullColumnWriter instead.");
}
this.bitWriter = new BitWriter(initialCapacity);
this.bitWriter = new BitWriter(initialCapacity, pageSize);
bitsPerValue = (int)Math.ceil(Math.log(bound + 1)/Math.log(2));
shouldRepeatThreshold = (bitsPerValue + 9)/(1 + bitsPerValue);
if (Log.DEBUG) LOG.debug("init column with bit width of " + bitsPerValue + " and repeat threshold of " + shouldRepeatThreshold);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
* reused between flushes.
*/
public static final int MAX_BITWIDTH = 32;

public static final int DEFAULT_NUM_BLOCK_VALUES = 128;

public static final int DEFAULT_NUM_MINIBLOCKS = 4;

private final CapacityByteArrayOutputStream baos;
Expand Down Expand Up @@ -107,17 +107,17 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
* it will be reset after each flush
*/
private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
public DeltaBinaryPackingValuesWriter(int slabSize) {
this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize);

public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize) {
this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize);
}

public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize) {
public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize) {
this.config = new DeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum);
bitWidths = new int[config.miniBlockNumInABlock];
deltaBlockBuffer = new int[blockSizeInValues];
miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
baos = new CapacityByteArrayOutputStream(slabSize);
baos = new CapacityByteArrayOutputStream(slabSize, pageSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* <pre>
* {@code
* delta-length-byte-array : length* byte-array*
* }
* }
* </pre>
* @author Aniket Mokashi
*
Expand All @@ -45,13 +45,13 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
private CapacityByteArrayOutputStream arrayOut;
private LittleEndianDataOutputStream out;

public DeltaLengthByteArrayValuesWriter(int initialSize) {
arrayOut = new CapacityByteArrayOutputStream(initialSize);
public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize) {
arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
out = new LittleEndianDataOutputStream(arrayOut);
lengthWriter = new DeltaBinaryPackingValuesWriter(
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES,
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS,
initialSize);
initialSize, pageSize);
}

@Override
Expand Down Expand Up @@ -98,6 +98,6 @@ public long getAllocatedSize() {

@Override
public String memUsageString(String prefix) {
return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY");
return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* <pre>
* {@code
* delta-length-byte-array : prefix-length* suffixes*
* }
* }
* </pre>
* @author Aniket Mokashi
*
Expand All @@ -38,9 +38,9 @@ public class DeltaByteArrayWriter extends ValuesWriter{
private ValuesWriter suffixWriter;
private byte[] previous;

public DeltaByteArrayWriter(int initialCapacity) {
this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity);
this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity);
public DeltaByteArrayWriter(int initialCapacity, int pageSize) {
this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity, pageSize);
this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity, pageSize);
this.previous = new byte[0];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public BytesInput getBytes() {
if (DEBUG) LOG.debug("max dic id " + maxDicId);
int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
// TODO: what is a good initialCapacity?
RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024);
RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024, maxDictionaryByteSize);
IntIterator iterator = encodedValues.iterator();
try {
while (iterator.hasNext()) {
Expand Down Expand Up @@ -237,7 +237,7 @@ public void writeBytes(Binary v) {
public DictionaryPage createDictionaryPage() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize);
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Expand Down Expand Up @@ -312,7 +312,7 @@ public void writeBytes(Binary value) {
public DictionaryPage createDictionaryPage() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize);
FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize, maxDictionaryByteSize);
Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Expand Down Expand Up @@ -357,7 +357,7 @@ public void writeLong(long v) {
public DictionaryPage createDictionaryPage() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize);
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
LongIterator longIterator = longDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Expand Down Expand Up @@ -429,7 +429,7 @@ public void writeDouble(double v) {
public DictionaryPage createDictionaryPage() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize);
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
DoubleIterator doubleIterator = doubleDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Expand Down Expand Up @@ -501,7 +501,7 @@ public void writeInteger(int v) {
public DictionaryPage createDictionaryPage() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize);
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
it.unimi.dsi.fastutil.ints.IntIterator intIterator = intDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Expand Down Expand Up @@ -573,7 +573,7 @@ public void writeFloat(float v) {
public DictionaryPage createDictionaryPage() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize);
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
FloatIterator floatIterator = floatDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@
*/
public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter {
private static final Log LOG = Log.getLog(PlainValuesWriter.class);

private CapacityByteArrayOutputStream arrayOut;
private LittleEndianDataOutputStream out;
private int length;
public FixedLenByteArrayPlainValuesWriter(int length, int initialSize) {

public FixedLenByteArrayPlainValuesWriter(int length, int initialSize, int pageSize) {
this.length = length;
this.arrayOut = new CapacityByteArrayOutputStream(initialSize);
this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
this.out = new LittleEndianDataOutputStream(arrayOut);
}

Expand All @@ -56,7 +56,7 @@ public final void writeBytes(Binary v) {
throw new ParquetEncodingException("could not write fixed bytes", e);
}
}

@Override
public long getBufferedSize() {
return arrayOut.size();
Expand All @@ -72,7 +72,7 @@ public BytesInput getBytes() {
if (Log.DEBUG) LOG.debug("writing a buffer of size " + arrayOut.size());
return BytesInput.from(arrayOut);
}

@Override
public void reset() {
arrayOut.reset();
Expand All @@ -82,7 +82,7 @@ public void reset() {
public long getAllocatedSize() {
return arrayOut.getCapacity();
}

@Override
public Encoding getEncoding() {
return Encoding.PLAIN;
Expand Down
Loading