Skip to content

Commit

Permalink
Merge pull request #2 from isnotinvain/PR-98
Browse files Browse the repository at this point in the history
Updates to PR-98
  • Loading branch information
julienledem committed Feb 28, 2015
2 parents 1df4a71 + b9abab0 commit 37148d6
Show file tree
Hide file tree
Showing 18 changed files with 242 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,20 @@ public boolean isEnableDictionary() {

public ColumnWriteStore newColumnWriteStore(
MessageType schema,
PageWriteStore pageStore, int pageSize,
int initialPageBufferSize) {
PageWriteStore pageStore,
int pageSize) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(
pageStore,
pageSize, initialPageBufferSize, dictionaryPageSizeThreshold,
pageSize,
dictionaryPageSizeThreshold,
enableDictionary, writerVersion);
case PARQUET_2_0:
return new ColumnWriteStoreV2(
schema,
pageStore,
pageSize, initialPageBufferSize,
pageSize,
new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,12 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore {
private final int pageSizeThreshold;
private final int dictionaryPageSizeThreshold;
private final boolean enableDictionary;
private final int initialSizePerCol;
private final WriterVersion writerVersion;

public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
super();
this.pageWriteStore = pageWriteStore;
this.pageSizeThreshold = pageSizeThreshold;
this.initialSizePerCol = initialSizePerCol;
this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
this.enableDictionary = enableDictionary;
this.writerVersion = writerVersion;
Expand All @@ -64,7 +62,7 @@ public Set<ColumnDescriptor> getColumnDescriptors() {

private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, initialSizePerCol, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
public ColumnWriteStoreV2(
MessageType schema,
PageWriteStore pageWriteStore,
int pageSizeThreshold, int initialSizePerCol,
int pageSizeThreshold,
ParquetProperties parquetProps) {
super();
this.pageSizeThreshold = pageSizeThreshold;
this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps, pageSizeThreshold));
mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold));
}
this.columns = unmodifiableMap(mcolumns);
this.writers = this.columns.values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;

import parquet.Log;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnWriter;
import parquet.column.ParquetProperties;
Expand All @@ -31,6 +32,9 @@
import parquet.io.ParquetEncodingException;
import parquet.io.api.Binary;

import static java.lang.Math.max;
import static java.lang.Math.pow;

/**
* Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
*
Expand All @@ -41,6 +45,7 @@ final class ColumnWriterV1 implements ColumnWriter {
private static final Log LOG = Log.getLog(ColumnWriterV1.class);
private static final boolean DEBUG = Log.DEBUG;
private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
private static final int MIN_SLAB_SIZE = 64;

private final ColumnDescriptor path;
private final PageWriter pageWriter;
Expand All @@ -57,7 +62,6 @@ public ColumnWriterV1(
ColumnDescriptor path,
PageWriter pageWriter,
int pageSizeThreshold,
int initialSizePerCol,
int dictionaryPageSizeThreshold,
boolean enableDictionary,
WriterVersion writerVersion) {
Expand All @@ -69,9 +73,12 @@ public ColumnWriterV1(
resetStatistics();

ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol, pageSizeThreshold);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol, pageSizeThreshold);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSizeThreshold);

this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);

int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
}

private void log(Object value, int r, int d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
*/
package parquet.column.impl;

import static java.lang.Math.max;
import static java.lang.Math.pow;
import static parquet.bytes.BytesUtils.getWidthFromMaxInt;

import java.io.IOException;

import parquet.Ints;
import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnWriter;
import parquet.column.Encoding;
Expand All @@ -43,6 +46,7 @@
final class ColumnWriterV2 implements ColumnWriter {
private static final Log LOG = Log.getLog(ColumnWriterV2.class);
private static final boolean DEBUG = Log.DEBUG;
private static final int MIN_SLAB_SIZE = 64;

private final ColumnDescriptor path;
private final PageWriter pageWriter;
Expand All @@ -57,15 +61,17 @@ final class ColumnWriterV2 implements ColumnWriter {
public ColumnWriterV2(
ColumnDescriptor path,
PageWriter pageWriter,
int initialSizePerCol,
ParquetProperties parquetProps,
int pageSize) {
this.path = path;
this.pageWriter = pageWriter;
resetStatistics();
this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol, pageSize);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol, pageSize);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSize);

this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);

int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10);
this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
}

private void log(Object value, int r, int d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.bytes.BytesUtils;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.column.Encoding;
import parquet.column.page.DictionaryPage;
import parquet.column.values.RequiresFallback;
Expand All @@ -62,6 +63,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req

/* max entries allowed for the dictionary will fail over to plain encoding if reached */
private static final int MAX_DICTIONARY_ENTRIES = Integer.MAX_VALUE - 1;
private static final int MIN_INITIAL_SLAB_SIZE = 64;

/* encoding to label the data page */
private final Encoding encodingForDataPage;
Expand Down Expand Up @@ -142,8 +144,12 @@ public BytesInput getBytes() {
int maxDicId = getDictionarySize() - 1;
if (DEBUG) LOG.debug("max dic id " + maxDicId);
int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
// TODO: what is a good initialCapacity?
RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024, maxDictionaryByteSize);

int initialSlabSize =
CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10);

RunLengthBitPackingHybridEncoder encoder =
new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize);
IntIterator iterator = encodedValues.iterator();
try {
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void test() {
MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
ColumnDescriptor col = schema.getColumns().get(0);
MemPageWriter pageWriter = new MemPageWriter();
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
for (int i = 0; i < rows; i++) {
columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
if ((i + 1) % 1000 == 0) {
Expand Down Expand Up @@ -73,7 +73,7 @@ public void testOptional() {
MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }");
ColumnDescriptor col = schema.getColumns().get(0);
MemPageWriter pageWriter = new MemPageWriter();
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
for (int i = 0; i < rows; i++) {
columnWriterV2.writeNull(0, 0);
if ((i + 1) % 1000 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,6 @@ public void testMemColumnSeveralPagesRepeated() throws Exception {
}

private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
return new ColumnWriteStoreV1(memPageStore, 2048, 2048, 2048, false, WriterVersion.PARQUET_1_0);
return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0);
}
}
2 changes: 1 addition & 1 deletion parquet-column/src/test/java/parquet/io/PerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private static void read(MemPageStore memPageStore, MessageType myschema,


private static void write(MemPageStore memPageStore) {
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
MessageColumnIO columnIO = newColumnFactory(schema);

GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
Expand Down
2 changes: 1 addition & 1 deletion parquet-column/src/test/java/parquet/io/TestColumnIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ public void testPushParser() {
}

private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) {
return new ColumnWriteStoreV1(memPageStore, 800, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion parquet-column/src/test/java/parquet/io/TestFiltered.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void testFilteredNotPaged() {

private MemPageStore writeTestRecords(MessageColumnIO columnIO, int number) {
MemPageStore memPageStore = new MemPageStore(number * 2);
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, 800, false, WriterVersion.PARQUET_1_0);
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0);

GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
for ( int i = 0; i < number; i++ ) {
Expand Down
Loading

0 comments on commit 37148d6

Please sign in to comment.