Skip to content

PARQUET-160: avoid wasting 64K per empty buffer. #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions parquet-column/src/main/java/parquet/column/ParquetProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,29 @@ public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean
this.enableDictionary = enableDict;
}

public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol) {
public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
if (maxLevel == 0) {
return new DevNullValuesWriter();
} else {
return new RunLengthBitPackingHybridValuesWriter(
getWidthFromMaxInt(maxLevel), initialSizePerCol);
getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize);
}
}

private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol) {
private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
switch (path.getType()) {
case BOOLEAN:
return new BooleanPlainValuesWriter();
case INT96:
return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol);
return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize);
case FIXED_LEN_BYTE_ARRAY:
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol);
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize);
case BINARY:
case INT32:
case INT64:
case DOUBLE:
case FLOAT:
return new PlainValuesWriter(initialSizePerCol);
return new PlainValuesWriter(initialSizePerCol, pageSize);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand Down Expand Up @@ -128,24 +128,24 @@ private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initi
}
}

private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol) {
private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
switch(writerVersion) {
case PARQUET_1_0:
return plainWriter(path, initialSizePerCol);
return plainWriter(path, initialSizePerCol, pageSize);
case PARQUET_2_0:
switch (path.getType()) {
case BOOLEAN:
return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol);
return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize);
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return new DeltaByteArrayWriter(initialSizePerCol);
return new DeltaByteArrayWriter(initialSizePerCol, pageSize);
case INT32:
return new DeltaBinaryPackingValuesWriter(initialSizePerCol);
return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize);
case INT96:
case INT64:
case DOUBLE:
case FLOAT:
return plainWriter(path, initialSizePerCol);
return plainWriter(path, initialSizePerCol, pageSize);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand All @@ -154,8 +154,8 @@ private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePe
}
}

private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol) {
ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol);
private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol, pageSize);
if (enableDictionary) {
return FallbackValuesWriter.of(
dictionaryWriter(path, initialSizePerCol),
Expand All @@ -165,24 +165,24 @@ private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSi
}
}

public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol) {
public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
switch (path.getType()) {
case BOOLEAN: // no dictionary encoding for boolean
return writerToFallbackTo(path, initialSizePerCol);
return writerToFallbackTo(path, initialSizePerCol, pageSize);
case FIXED_LEN_BYTE_ARRAY:
// dictionary encoding for that type was not enabled in PARQUET 1.0
if (writerVersion == WriterVersion.PARQUET_2_0) {
return dictWriterWithFallBack(path, initialSizePerCol);
return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
} else {
return writerToFallbackTo(path, initialSizePerCol);
return writerToFallbackTo(path, initialSizePerCol, pageSize);
}
case BINARY:
case INT32:
case INT64:
case INT96:
case DOUBLE:
case FLOAT:
return dictWriterWithFallBack(path, initialSizePerCol);
return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand All @@ -202,19 +202,20 @@ public boolean isEnableDictionary() {

public ColumnWriteStore newColumnWriteStore(
MessageType schema,
PageWriteStore pageStore, int pageSize,
int initialPageBufferSize) {
PageWriteStore pageStore,
int pageSize) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(
pageStore,
pageSize, initialPageBufferSize, dictionaryPageSizeThreshold,
pageSize,
dictionaryPageSizeThreshold,
enableDictionary, writerVersion);
case PARQUET_2_0:
return new ColumnWriteStoreV2(
schema,
pageStore,
pageSize, initialPageBufferSize,
pageSize,
new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,12 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore {
private final int pageSizeThreshold;
private final int dictionaryPageSizeThreshold;
private final boolean enableDictionary;
private final int initialSizePerCol;
private final WriterVersion writerVersion;

public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
super();
this.pageWriteStore = pageWriteStore;
this.pageSizeThreshold = pageSizeThreshold;
this.initialSizePerCol = initialSizePerCol;
this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
this.enableDictionary = enableDictionary;
this.writerVersion = writerVersion;
Expand All @@ -64,7 +62,7 @@ public Set<ColumnDescriptor> getColumnDescriptors() {

private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, initialSizePerCol, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
public ColumnWriteStoreV2(
MessageType schema,
PageWriteStore pageWriteStore,
int pageSizeThreshold, int initialSizePerCol,
int pageSizeThreshold,
ParquetProperties parquetProps) {
super();
this.pageSizeThreshold = pageSizeThreshold;
this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps));
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold));
}
this.columns = unmodifiableMap(mcolumns);
this.writers = this.columns.values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;

import parquet.Log;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnWriter;
import parquet.column.ParquetProperties;
Expand All @@ -31,6 +32,9 @@
import parquet.io.ParquetEncodingException;
import parquet.io.api.Binary;

import static java.lang.Math.max;
import static java.lang.Math.pow;

/**
* Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
*
Expand All @@ -41,6 +45,7 @@ final class ColumnWriterV1 implements ColumnWriter {
private static final Log LOG = Log.getLog(ColumnWriterV1.class);
private static final boolean DEBUG = Log.DEBUG;
private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
private static final int MIN_SLAB_SIZE = 64;

private final ColumnDescriptor path;
private final PageWriter pageWriter;
Expand All @@ -57,7 +62,6 @@ public ColumnWriterV1(
ColumnDescriptor path,
PageWriter pageWriter,
int pageSizeThreshold,
int initialSizePerCol,
int dictionaryPageSizeThreshold,
boolean enableDictionary,
WriterVersion writerVersion) {
Expand All @@ -69,9 +73,12 @@ public ColumnWriterV1(
resetStatistics();

ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);

this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);

int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
}

private void log(Object value, int r, int d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
*/
package parquet.column.impl;

import static java.lang.Math.max;
import static java.lang.Math.pow;
import static parquet.bytes.BytesUtils.getWidthFromMaxInt;

import java.io.IOException;

import parquet.Ints;
import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnWriter;
import parquet.column.Encoding;
Expand All @@ -43,6 +46,7 @@
final class ColumnWriterV2 implements ColumnWriter {
private static final Log LOG = Log.getLog(ColumnWriterV2.class);
private static final boolean DEBUG = Log.DEBUG;
private static final int MIN_SLAB_SIZE = 64;

private final ColumnDescriptor path;
private final PageWriter pageWriter;
Expand All @@ -57,14 +61,17 @@ final class ColumnWriterV2 implements ColumnWriter {
public ColumnWriterV2(
ColumnDescriptor path,
PageWriter pageWriter,
int initialSizePerCol,
ParquetProperties parquetProps) {
ParquetProperties parquetProps,
int pageSize) {
this.path = path;
this.pageWriter = pageWriter;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should tweak the initialSize here.
levels should get a tiny initial size (100 bytes?) in case they are always null or always defined.

resetStatistics();
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);

this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);

int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10);
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
}

private void log(Object value, int r, int d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ public class BitPackingValuesWriter extends ValuesWriter {

/**
* @param bound the maximum value stored by this column
* @param pageSize
*/
public BitPackingValuesWriter(int bound, int initialCapacity) {
public BitPackingValuesWriter(int bound, int initialCapacity, int pageSize) {
this.bitsPerValue = getWidthFromMaxInt(bound);
this.out = new CapacityByteArrayOutputStream(initialCapacity);
this.out = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
init();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class BitWriter {
}
}

public BitWriter(int initialCapacity) {
this.baos = new CapacityByteArrayOutputStream(initialCapacity);
public BitWriter(int initialCapacity, int pageSize) {
this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
}

public void writeBit(boolean bit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static ValuesReader getBoundedReader(int bound) {
return bound == 0 ? new ZeroIntegerValuesReader() : new BoundedIntValuesReader(bound);
}

public static ValuesWriter getBoundedWriter(int bound, int initialCapacity) {
return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity);
public static ValuesWriter getBoundedWriter(int bound, int initialCapacity, int pageSize) {
return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity, pageSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ class BoundedIntValuesWriter extends ValuesWriter {
}
}

public BoundedIntValuesWriter(int bound, int initialCapacity) {
public BoundedIntValuesWriter(int bound, int initialCapacity, int pageSize) {
if (bound == 0) {
throw new ParquetEncodingException("Value bound cannot be 0. Use DevNullColumnWriter instead.");
}
this.bitWriter = new BitWriter(initialCapacity);
this.bitWriter = new BitWriter(initialCapacity, pageSize);
bitsPerValue = (int)Math.ceil(Math.log(bound + 1)/Math.log(2));
shouldRepeatThreshold = (bitsPerValue + 9)/(1 + bitsPerValue);
if (Log.DEBUG) LOG.debug("init column with bit width of " + bitsPerValue + " and repeat threshold of " + shouldRepeatThreshold);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
* reused between flushes.
*/
public static final int MAX_BITWIDTH = 32;

public static final int DEFAULT_NUM_BLOCK_VALUES = 128;

public static final int DEFAULT_NUM_MINIBLOCKS = 4;

private final CapacityByteArrayOutputStream baos;
Expand Down Expand Up @@ -107,17 +107,17 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
* it will be reset after each flush
*/
private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
public DeltaBinaryPackingValuesWriter(int slabSize) {
this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize);

public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize) {
this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize);
}

public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize) {
public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize) {
this.config = new DeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum);
bitWidths = new int[config.miniBlockNumInABlock];
deltaBlockBuffer = new int[blockSizeInValues];
miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
baos = new CapacityByteArrayOutputStream(slabSize);
baos = new CapacityByteArrayOutputStream(slabSize, pageSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* <pre>
* {@code
* delta-length-byte-array : length* byte-array*
* }
* }
* </pre>
* @author Aniket Mokashi
*
Expand All @@ -45,13 +45,13 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
private CapacityByteArrayOutputStream arrayOut;
private LittleEndianDataOutputStream out;

public DeltaLengthByteArrayValuesWriter(int initialSize) {
arrayOut = new CapacityByteArrayOutputStream(initialSize);
public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize) {
arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
out = new LittleEndianDataOutputStream(arrayOut);
lengthWriter = new DeltaBinaryPackingValuesWriter(
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES,
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS,
initialSize);
initialSize, pageSize);
}

@Override
Expand Down Expand Up @@ -98,6 +98,6 @@ public long getAllocatedSize() {

@Override
public String memUsageString(String prefix) {
return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY");
return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY");
}
}
Loading