Skip to content

Commit

Permalink
Don't use CapacityByteArrayOutputStream for writing page chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
isnotinvain committed Feb 21, 2015
1 parent 6a20e8b commit 8b54667
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public BytesInput getBytes() {
if (DEBUG) LOG.debug("max dic id " + maxDicId);
int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
// TODO: what is a good initialCapacity?
RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024, maxDictionaryByteSize);
RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64, maxDictionaryByteSize);
IntIterator iterator = encodedValues.iterator();
try {
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void test() {
MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
ColumnDescriptor col = schema.getColumns().get(0);
MemPageWriter pageWriter = new MemPageWriter();
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
for (int i = 0; i < rows; i++) {
columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
if ((i + 1) % 1000 == 0) {
Expand Down Expand Up @@ -73,7 +73,7 @@ public void testOptional() {
MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }");
ColumnDescriptor col = schema.getColumns().get(0);
MemPageWriter pageWriter = new MemPageWriter();
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
for (int i = 0; i < rows; i++) {
columnWriterV2.writeNull(0, 0);
if ((i + 1) % 1000 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,6 @@ public void testMemColumnSeveralPagesRepeated() throws Exception {
}

private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
return new ColumnWriteStoreV1(memPageStore, 2048, 2048, 2048, false, WriterVersion.PARQUET_1_0);
return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0);
}
}
2 changes: 1 addition & 1 deletion parquet-column/src/test/java/parquet/io/PerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private static void read(MemPageStore memPageStore, MessageType myschema,


private static void write(MemPageStore memPageStore) {
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
MessageColumnIO columnIO = newColumnFactory(schema);

GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
Expand Down
2 changes: 1 addition & 1 deletion parquet-column/src/test/java/parquet/io/TestColumnIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ public void testPushParser() {
}

private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) {
return new ColumnWriteStoreV1(memPageStore, 800, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion parquet-column/src/test/java/parquet/io/TestFiltered.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void testFilteredNotPaged() {

private MemPageStore writeTestRecords(MessageColumnIO columnIO, int number) {
MemPageStore memPageStore = new MemPageStore(number * 2);
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, 800, false, WriterVersion.PARQUET_1_0);
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0);

GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
for ( int i = 0; i < number; i++ ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public CapacityByteArrayOutputStream(int initialSlabSize) {
public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) {
checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0");
checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0");
checkArgument(initialSlabSize <= maxCapacityHint, "maxCapacityHint can't be less than initialSlabSize");
checkArgument(maxCapacityHint >= initialSlabSize, String.format("maxCapacityHint can't be less than initialSlabSize %d %d", initialSlabSize, maxCapacityHint));
this.initialSlabSize = initialSlabSize;
this.maxCapacityHint = maxCapacityHint;
reset();
Expand Down Expand Up @@ -154,7 +154,7 @@ public void write(byte b[], int off, int len) {
public void writeTo(OutputStream out) throws IOException {
for (int i = 0; i < slabs.size() - 1; i++) {
final byte[] slab = slabs.get(i);
out.write(slab, 0, slab.length);
out.write(slab);
}
out.write(currentSlab, 0, currentSlabIndex);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package parquet.bytes;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;

import static java.lang.String.format;

public class ConcatenatingByteArrayCollector extends BytesInput {
private final List<byte[]> slabs = new ArrayList<byte[]>();
private long size = 0;

public void collect(BytesInput bytes) throws IOException {
collect(bytes.toByteArray());
}

public void collect(byte[] bytes) {
slabs.add(bytes);
size += bytes.length;
}

public void reset() {
size = 0;
slabs.clear();
}

@Override
public void writeAllTo(OutputStream out) throws IOException {
for (byte[] slab : slabs) {
out.write(slab);
}
}

@Override
public long size() {
return size;
}

/**
* @param prefix a prefix to be used for every new line in the string
* @return a text representation of the memory usage of this structure
*/
public String memUsageString(String prefix) {
return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), size);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static parquet.Log.INFO;
import static parquet.column.statistics.Statistics.getStatsBasedOnType;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -28,6 +29,7 @@
import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.bytes.ConcatenatingByteArrayCollector;
import parquet.column.ColumnDescriptor;
import parquet.column.Encoding;
import parquet.column.page.DictionaryPage;
Expand All @@ -50,7 +52,8 @@ private static final class ColumnChunkPageWriter implements PageWriter {
private final ColumnDescriptor path;
private final BytesCompressor compressor;

private final CapacityByteArrayOutputStream buf;
private final ByteArrayOutputStream tempOutputStream = new ByteArrayOutputStream();
private final ConcatenatingByteArrayCollector buf;
private DictionaryPage dictionaryPage;

private long uncompressedLength;
Expand All @@ -69,7 +72,7 @@ private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor,
// this writer will write many pages, so we make the initial slab size 1 page size.
// It will then double over time until it reaches COLUMN_CHUNK_WRITER_MAX_SIZE_HINT at
// which point it will grow linearly.
this.buf = new CapacityByteArrayOutputStream(pageSize, COLUMN_CHUNK_WRITER_MAX_SIZE_HINT);
this.buf = new ConcatenatingByteArrayCollector();
this.totalStatistics = getStatsBasedOnType(this.path.getType());
}

Expand All @@ -93,6 +96,7 @@ public void writePage(BytesInput bytes,
"Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
+ compressedSize);
}
tempOutputStream.reset();
parquetMetadataConverter.writeDataPageHeader(
(int)uncompressedSize,
(int)compressedSize,
Expand All @@ -101,13 +105,15 @@ public void writePage(BytesInput bytes,
rlEncoding,
dlEncoding,
valuesEncoding,
buf);
tempOutputStream);
buf.collect(tempOutputStream.toByteArray());
tempOutputStream.reset();
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
this.totalStatistics.mergeStatistics(statistics);
compressedBytes.writeAllTo(buf);
buf.collect(compressedBytes);
encodings.add(rlEncoding);
encodings.add(dlEncoding);
encodings.add(valuesEncoding);
Expand All @@ -129,21 +135,25 @@ public void writePageV2(
int compressedSize = toIntWithCheck(
compressedData.size() + repetitionLevels.size() + definitionLevels.size()
);
tempOutputStream.reset();
parquetMetadataConverter.writeDataPageV2Header(
uncompressedSize, compressedSize,
valueCount, nullCount, rowCount,
statistics,
dataEncoding,
rlByteLength, dlByteLength,
buf);
rlByteLength,
dlByteLength,
tempOutputStream);
buf.collect(tempOutputStream.toByteArray());
tempOutputStream.reset();
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
this.totalStatistics.mergeStatistics(statistics);
repetitionLevels.writeAllTo(buf);
definitionLevels.writeAllTo(buf);
compressedData.writeAllTo(buf);
buf.collect(repetitionLevels);
buf.collect(definitionLevels);
buf.collect(compressedData);
encodings.add(dataEncoding);
}

Expand All @@ -167,7 +177,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
writer.writeDictionaryPage(dictionaryPage);
encodings.add(dictionaryPage.getEncoding());
}
writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings));
writer.writeDataPages(buf, uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings));
writer.endColumn();
if (INFO) {
LOG.info(
Expand All @@ -185,7 +195,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException {

@Override
public long allocatedSize() {
return buf.getCapacity();
return buf.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void test() throws Exception {
writer.start();
writer.startBlock(rowCount);
{
ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(f.getCompressor(codec, pageSize ), schema , initialSize, pageSize);
ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(f.getCompressor(codec, pageSize ), schema, pageSize);
PageWriter pageWriter = store.getPageWriter(col);
pageWriter.writePageV2(
rowCount, nullCount, valueCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static void main(String[] args) throws Exception {
MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchema));

MemPageStore memPageStore = new MemPageStore(0);
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
write(memPageStore, columns, schema, pigSchema);
columns.flush();
read(memPageStore, pigSchema, pigSchemaProjected, pigSchemaNoString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private <T extends TBase<?,?>> void validate(T expected) throws TException {
final MessageType schema = schemaConverter.convert(thriftClass);
LOG.info(schema);
final MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, 10000, false, WriterVersion.PARQUET_1_0);
final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, false, WriterVersion.PARQUET_1_0);
final RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
final StructType thriftType = schemaConverter.toStructType(thriftClass);
ParquetWriteProtocol parquetWriteProtocol = new ParquetWriteProtocol(recordWriter, columnIO, thriftType);
Expand Down

0 comments on commit 8b54667

Please sign in to comment.