Skip to content

Commit

Permalink
Address Julien's comment
Browse files Browse the repository at this point in the history
  • Loading branch information
isnotinvain committed Feb 27, 2015
1 parent 965af7f commit b9abab0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ public class ConcatenatingByteArrayCollector extends BytesInput {
private final List<byte[]> slabs = new ArrayList<byte[]>();
private long size = 0;

public void collect(BytesInput bytes) throws IOException {
collect(bytes.toByteArray());
}

public void collect(byte[] bytes) {
public void collect(BytesInput bytesInput) throws IOException {
byte[] bytes = bytesInput.toByteArray();
slabs.add(bytes);
size += bytes.length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.bytes.ConcatenatingByteArrayCollector;
import parquet.column.ColumnDescriptor;
import parquet.column.Encoding;
Expand Down Expand Up @@ -102,14 +101,14 @@ public void writePage(BytesInput bytes,
dlEncoding,
valuesEncoding,
tempOutputStream);
buf.collect(tempOutputStream.toByteArray());
tempOutputStream.reset();
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
this.totalStatistics.mergeStatistics(statistics);
buf.collect(compressedBytes);
// by concatenating before collecting instead of collecting twice,
// we only allocate one buffer to copy into instead of multiple.
buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
encodings.add(rlEncoding);
encodings.add(dlEncoding);
encodings.add(valuesEncoding);
Expand Down Expand Up @@ -140,16 +139,21 @@ public void writePageV2(
rlByteLength,
dlByteLength,
tempOutputStream);
buf.collect(tempOutputStream.toByteArray());
tempOutputStream.reset();
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
this.totalStatistics.mergeStatistics(statistics);
buf.collect(repetitionLevels);
buf.collect(definitionLevels);
buf.collect(compressedData);

// by concatenating before collecting instead of collecting twice,
// we only allocate one buffer to copy into instead of multiple.
buf.collect(
BytesInput.concat(
BytesInput.from(tempOutputStream),
repetitionLevels,
definitionLevels,
compressedData)
);
encodings.add(dataEncoding);
}

Expand Down

0 comments on commit b9abab0

Please sign in to comment.