Skip to content

Commit

Permalink
refactor CapacityByteArray to be aware of page size
Browse files Browse the repository at this point in the history
  • Loading branch information
julienledem committed Jan 8, 2015
1 parent 95c8fb6 commit 1df4a71
Show file tree
Hide file tree
Showing 36 changed files with 202 additions and 216 deletions.
38 changes: 19 additions & 19 deletions parquet-column/src/main/java/parquet/column/ParquetProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,29 @@ public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean
this.enableDictionary = enableDict;
}

public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol) {
public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
if (maxLevel == 0) {
return new DevNullValuesWriter();
} else {
return new RunLengthBitPackingHybridValuesWriter(
getWidthFromMaxInt(maxLevel), initialSizePerCol);
getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize);
}
}

private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol) {
private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
switch (path.getType()) {
case BOOLEAN:
return new BooleanPlainValuesWriter();
case INT96:
return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol);
return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize);
case FIXED_LEN_BYTE_ARRAY:
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol);
return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize);
case BINARY:
case INT32:
case INT64:
case DOUBLE:
case FLOAT:
return new PlainValuesWriter(initialSizePerCol);
return new PlainValuesWriter(initialSizePerCol, pageSize);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand Down Expand Up @@ -128,24 +128,24 @@ private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initi
}
}

private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol) {
private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
switch(writerVersion) {
case PARQUET_1_0:
return plainWriter(path, initialSizePerCol);
return plainWriter(path, initialSizePerCol, pageSize);
case PARQUET_2_0:
switch (path.getType()) {
case BOOLEAN:
return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol);
return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize);
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return new DeltaByteArrayWriter(initialSizePerCol);
return new DeltaByteArrayWriter(initialSizePerCol, pageSize);
case INT32:
return new DeltaBinaryPackingValuesWriter(initialSizePerCol);
return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize);
case INT96:
case INT64:
case DOUBLE:
case FLOAT:
return plainWriter(path, initialSizePerCol);
return plainWriter(path, initialSizePerCol, pageSize);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand All @@ -154,8 +154,8 @@ private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePe
}
}

private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol) {
ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol);
private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol, pageSize);
if (enableDictionary) {
return FallbackValuesWriter.of(
dictionaryWriter(path, initialSizePerCol),
Expand All @@ -165,24 +165,24 @@ private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSi
}
}

public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol) {
public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
switch (path.getType()) {
case BOOLEAN: // no dictionary encoding for boolean
return writerToFallbackTo(path, initialSizePerCol);
return writerToFallbackTo(path, initialSizePerCol, pageSize);
case FIXED_LEN_BYTE_ARRAY:
// dictionary encoding for that type was not enabled in PARQUET 1.0
if (writerVersion == WriterVersion.PARQUET_2_0) {
return dictWriterWithFallBack(path, initialSizePerCol);
return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
} else {
return writerToFallbackTo(path, initialSizePerCol);
return writerToFallbackTo(path, initialSizePerCol, pageSize);
}
case BINARY:
case INT32:
case INT64:
case INT96:
case DOUBLE:
case FLOAT:
return dictWriterWithFallBack(path, initialSizePerCol);
return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ColumnWriteStoreV2(
Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps));
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps, pageSizeThreshold));
}
this.columns = unmodifiableMap(mcolumns);
this.writers = this.columns.values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ public ColumnWriterV1(
resetStatistics();

ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol, pageSizeThreshold);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol, pageSizeThreshold);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSizeThreshold);
}

private void log(Object value, int r, int d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ public ColumnWriterV2(
ColumnDescriptor path,
PageWriter pageWriter,
int initialSizePerCol,
ParquetProperties parquetProps) {
ParquetProperties parquetProps,
int pageSize) {
this.path = path;
this.pageWriter = pageWriter;
resetStatistics();
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol, pageSize);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol, pageSize);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSize);
}

private void log(Object value, int r, int d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ public class BitPackingValuesWriter extends ValuesWriter {

/**
* @param bound the maximum value stored by this column
* @param pageSize
*/
public BitPackingValuesWriter(int bound, int initialCapacity) {
public BitPackingValuesWriter(int bound, int initialCapacity, int pageSize) {
this.bitsPerValue = getWidthFromMaxInt(bound);
this.out = new CapacityByteArrayOutputStream(initialCapacity);
this.out = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
init();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class BitWriter {
}
}

public BitWriter(int initialCapacity) {
this.baos = new CapacityByteArrayOutputStream(initialCapacity);
public BitWriter(int initialCapacity, int pageSize) {
this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
}

public void writeBit(boolean bit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static ValuesReader getBoundedReader(int bound) {
return bound == 0 ? new ZeroIntegerValuesReader() : new BoundedIntValuesReader(bound);
}

public static ValuesWriter getBoundedWriter(int bound, int initialCapacity) {
return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity);
public static ValuesWriter getBoundedWriter(int bound, int initialCapacity, int pageSize) {
return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity, pageSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ class BoundedIntValuesWriter extends ValuesWriter {
}
}

public BoundedIntValuesWriter(int bound, int initialCapacity) {
public BoundedIntValuesWriter(int bound, int initialCapacity, int pageSize) {
if (bound == 0) {
throw new ParquetEncodingException("Value bound cannot be 0. Use DevNullColumnWriter instead.");
}
this.bitWriter = new BitWriter(initialCapacity);
this.bitWriter = new BitWriter(initialCapacity, pageSize);
bitsPerValue = (int)Math.ceil(Math.log(bound + 1)/Math.log(2));
shouldRepeatThreshold = (bitsPerValue + 9)/(1 + bitsPerValue);
if (Log.DEBUG) LOG.debug("init column with bit width of " + bitsPerValue + " and repeat threshold of " + shouldRepeatThreshold);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
* reused between flushes.
*/
public static final int MAX_BITWIDTH = 32;

public static final int DEFAULT_NUM_BLOCK_VALUES = 128;

public static final int DEFAULT_NUM_MINIBLOCKS = 4;

private final CapacityByteArrayOutputStream baos;
Expand Down Expand Up @@ -107,17 +107,17 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
* it will be reset after each flush
*/
private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
public DeltaBinaryPackingValuesWriter(int slabSize) {
this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize);

public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize) {
this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize);
}

public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize) {
public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize) {
this.config = new DeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum);
bitWidths = new int[config.miniBlockNumInABlock];
deltaBlockBuffer = new int[blockSizeInValues];
miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
baos = new CapacityByteArrayOutputStream(slabSize);
baos = new CapacityByteArrayOutputStream(slabSize, pageSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* <pre>
* {@code
* delta-length-byte-array : length* byte-array*
* }
* }
* </pre>
* @author Aniket Mokashi
*
Expand All @@ -45,13 +45,13 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
private CapacityByteArrayOutputStream arrayOut;
private LittleEndianDataOutputStream out;

public DeltaLengthByteArrayValuesWriter(int initialSize) {
arrayOut = new CapacityByteArrayOutputStream(initialSize);
public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize) {
arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
out = new LittleEndianDataOutputStream(arrayOut);
lengthWriter = new DeltaBinaryPackingValuesWriter(
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES,
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS,
initialSize);
initialSize, pageSize);
}

@Override
Expand Down Expand Up @@ -98,6 +98,6 @@ public long getAllocatedSize() {

@Override
public String memUsageString(String prefix) {
return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY");
return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* <pre>
* {@code
* delta-length-byte-array : prefix-length* suffixes*
* }
* }
* </pre>
* @author Aniket Mokashi
*
Expand All @@ -38,9 +38,9 @@ public class DeltaByteArrayWriter extends ValuesWriter{
private ValuesWriter suffixWriter;
private byte[] previous;

public DeltaByteArrayWriter(int initialCapacity) {
this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity);
this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity);
public DeltaByteArrayWriter(int initialCapacity, int pageSize) {
this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity, pageSize);
this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity, pageSize);
this.previous = new byte[0];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public BytesInput getBytes() {
if (DEBUG) LOG.debug("max dic id " + maxDicId);
int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
// TODO: what is a good initialCapacity?
RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024);
RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024, maxDictionaryByteSize);
IntIterator iterator = encodedValues.iterator();
try {
while (iterator.hasNext()) {
Expand Down Expand Up @@ -237,7 +237,7 @@ public void writeBytes(Binary v) {
public DictionaryPage createDictionaryPage() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize);
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Expand Down Expand Up @@ -312,7 +312,7 @@ public void writeBytes(Binary value) {
public DictionaryPage createDictionaryPage() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize);
FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize, maxDictionaryByteSize);
Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Expand Down Expand Up @@ -357,7 +357,7 @@ public void writeLong(long v) {
public DictionaryPage createDictionaryPage() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize);
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
LongIterator longIterator = longDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Expand Down Expand Up @@ -429,7 +429,7 @@ public void writeDouble(double v) {
public DictionaryPage createDictionaryPage() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize);
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
DoubleIterator doubleIterator = doubleDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Expand Down Expand Up @@ -501,7 +501,7 @@ public void writeInteger(int v) {
public DictionaryPage createDictionaryPage() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize);
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
it.unimi.dsi.fastutil.ints.IntIterator intIterator = intDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Expand Down Expand Up @@ -573,7 +573,7 @@ public void writeFloat(float v) {
public DictionaryPage createDictionaryPage() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize);
PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
FloatIterator floatIterator = floatDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@
*/
public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter {
private static final Log LOG = Log.getLog(PlainValuesWriter.class);

private CapacityByteArrayOutputStream arrayOut;
private LittleEndianDataOutputStream out;
private int length;
public FixedLenByteArrayPlainValuesWriter(int length, int initialSize) {

public FixedLenByteArrayPlainValuesWriter(int length, int initialSize, int pageSize) {
this.length = length;
this.arrayOut = new CapacityByteArrayOutputStream(initialSize);
this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
this.out = new LittleEndianDataOutputStream(arrayOut);
}

Expand All @@ -56,7 +56,7 @@ public final void writeBytes(Binary v) {
throw new ParquetEncodingException("could not write fixed bytes", e);
}
}

@Override
public long getBufferedSize() {
return arrayOut.size();
Expand All @@ -72,7 +72,7 @@ public BytesInput getBytes() {
if (Log.DEBUG) LOG.debug("writing a buffer of size " + arrayOut.size());
return BytesInput.from(arrayOut);
}

@Override
public void reset() {
arrayOut.reset();
Expand All @@ -82,7 +82,7 @@ public void reset() {
public long getAllocatedSize() {
return arrayOut.getCapacity();
}

@Override
public Encoding getEncoding() {
return Encoding.PLAIN;
Expand Down
Loading

0 comments on commit 1df4a71

Please sign in to comment.