Skip to content

Commit

Permalink
Remove initialSlabSize decision from InternalParquetRecordReader, use…
Browse files Browse the repository at this point in the history
… a simpler heuristic in the column writers instead
  • Loading branch information
isnotinvain committed Feb 21, 2015
1 parent 3a0f8e4 commit 6a20e8b
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,20 @@ public boolean isEnableDictionary() {

public ColumnWriteStore newColumnWriteStore(
MessageType schema,
PageWriteStore pageStore, int pageSize,
int initialPageBufferSize) {
PageWriteStore pageStore,
int pageSize) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(
pageStore,
pageSize, initialPageBufferSize, dictionaryPageSizeThreshold,
pageSize,
dictionaryPageSizeThreshold,
enableDictionary, writerVersion);
case PARQUET_2_0:
return new ColumnWriteStoreV2(
schema,
pageStore,
pageSize, initialPageBufferSize,
pageSize,
new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,12 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore {
private final int pageSizeThreshold;
private final int dictionaryPageSizeThreshold;
private final boolean enableDictionary;
private final int initialSizePerCol;
private final WriterVersion writerVersion;

public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
super();
this.pageWriteStore = pageWriteStore;
this.pageSizeThreshold = pageSizeThreshold;
this.initialSizePerCol = initialSizePerCol;
this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
this.enableDictionary = enableDictionary;
this.writerVersion = writerVersion;
Expand All @@ -64,7 +62,7 @@ public Set<ColumnDescriptor> getColumnDescriptors() {

private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, initialSizePerCol, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
public ColumnWriteStoreV2(
MessageType schema,
PageWriteStore pageWriteStore,
int pageSizeThreshold, int initialSizePerCol,
int pageSizeThreshold,
ParquetProperties parquetProps) {
super();
this.pageSizeThreshold = pageSizeThreshold;
this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps, pageSizeThreshold));
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold));
}
this.columns = unmodifiableMap(mcolumns);
this.writers = this.columns.values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import parquet.io.ParquetEncodingException;
import parquet.io.api.Binary;

import static java.lang.Math.max;
import static java.lang.Math.pow;

/**
* Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
*
Expand All @@ -41,6 +44,7 @@ final class ColumnWriterV1 implements ColumnWriter {
private static final Log LOG = Log.getLog(ColumnWriterV1.class);
private static final boolean DEBUG = Log.DEBUG;
private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
private static final int MIN_SLAB_SIZE = 64;

private final ColumnDescriptor path;
private final PageWriter pageWriter;
Expand All @@ -57,7 +61,6 @@ public ColumnWriterV1(
ColumnDescriptor path,
PageWriter pageWriter,
int pageSizeThreshold,
int initialSizePerCol,
int dictionaryPageSizeThreshold,
boolean enableDictionary,
WriterVersion writerVersion) {
Expand All @@ -69,9 +72,15 @@ public ColumnWriterV1(
resetStatistics();

ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol, pageSizeThreshold);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol, pageSizeThreshold);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSizeThreshold);

this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);

// initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize
// eg for page size of 1MB we start at 1024 bytes.
// we also don't want to start too small, so we also apply a minimum.
int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSizeThreshold / pow(2, 10))));
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
}

private void log(Object value, int r, int d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package parquet.column.impl;

import static java.lang.Math.max;
import static java.lang.Math.pow;
import static parquet.bytes.BytesUtils.getWidthFromMaxInt;

import java.io.IOException;
Expand Down Expand Up @@ -43,6 +45,7 @@
final class ColumnWriterV2 implements ColumnWriter {
private static final Log LOG = Log.getLog(ColumnWriterV2.class);
private static final boolean DEBUG = Log.DEBUG;
private static final int MIN_SLAB_SIZE = 64;

private final ColumnDescriptor path;
private final PageWriter pageWriter;
Expand All @@ -57,15 +60,20 @@ final class ColumnWriterV2 implements ColumnWriter {
public ColumnWriterV2(
ColumnDescriptor path,
PageWriter pageWriter,
int initialSizePerCol,
ParquetProperties parquetProps,
int pageSize) {
this.path = path;
this.pageWriter = pageWriter;
resetStatistics();
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol, pageSize);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol, pageSize);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSize);

this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);

// initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize
// eg for page size of 1MB we start at 1024 bytes.
// we also don't want to start too small, so we also apply a minimum.
int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSize / pow(2, 10))));
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
}

private void log(Object value, int r, int d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
class InternalParquetRecordWriter<T> {
private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);

private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;

Expand Down Expand Up @@ -98,22 +97,11 @@ public InternalParquetRecordWriter(
}

private void initStore() {
// we don't want this number to be too small
// ideally we divide the block equally across the columns
// it is unlikely all columns are going to be the same size.
// its value is likely below Integer.MAX_VALUE (2GB), although rowGroupSize is a long type.
// therefore this size is cast to int, since allocating byte array in under layer needs to
// limit the array size in an int scope.
int initialBlockBufferSize = Ints.checkedCast(max(MINIMUM_BUFFER_SIZE, rowGroupSize / schema.getColumns().size() / 5));
pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize);
// we don't want this number to be too small either
// ideally, slightly bigger than the page size, but not bigger than the block buffer
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
columnStore = parquetProperties.newColumnWriteStore(
schema,
pageStore,
pageSize,
initialPageBufferSize);
pageSize);
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
}
Expand Down

0 comments on commit 6a20e8b

Please sign in to comment.