Skip to content

Commit

Permalink
Use simpler settings for column chunk writer
Browse files Browse the repository at this point in the history
  • Loading branch information
isnotinvain committed Feb 21, 2015
1 parent b2736a1 commit 3a0f8e4
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public CapacityByteArrayOutputStream(int initialSlabSize) {
public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) {
checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0");
checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0");
checkArgument(initialSlabSize <= maxCapacityHint, "maxCapacityHint can't be less than initialSlabSize");
this.initialSlabSize = initialSlabSize;
this.maxCapacityHint = maxCapacityHint;
reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

class ColumnChunkPageWriteStore implements PageWriteStore {
private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
private static final int COLUMN_CHUNK_WRITER_MAX_SIZE_HINT = 64 * 1024;

private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();

Expand All @@ -61,10 +62,14 @@ private static final class ColumnChunkPageWriter implements PageWriter {

private Statistics totalStatistics;

private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize, int pageSize) {
private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int pageSize) {
this.path = path;
this.compressor = compressor;
this.buf = new CapacityByteArrayOutputStream(initialSize, pageSize);

// this writer will write many pages, so we make the initial slab size 1 page size.
// It will then double over time until it reaches COLUMN_CHUNK_WRITER_MAX_SIZE_HINT at
// which point it will grow linearly.
this.buf = new CapacityByteArrayOutputStream(pageSize, COLUMN_CHUNK_WRITER_MAX_SIZE_HINT);
this.totalStatistics = getStatsBasedOnType(this.path.getType());
}

Expand Down Expand Up @@ -202,9 +207,9 @@ public String memUsageString(String prefix) {

private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();

public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize, int pageSize) {
public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) {
for (ColumnDescriptor path : schema.getColumns()) {
writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize, pageSize));
writers.put(path, new ColumnChunkPageWriter(path, compressor, pageSize));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private void initStore() {
// therefore this size is cast to int, since allocating byte array in under layer needs to
// limit the array size in an int scope.
int initialBlockBufferSize = Ints.checkedCast(max(MINIMUM_BUFFER_SIZE, rowGroupSize / schema.getColumns().size() / 5));
pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize, pageSize);
pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize);
// we don't want this number to be too small either
// ideally, slightly bigger than the page size, but not bigger than the block buffer
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
Expand Down

0 comments on commit 3a0f8e4

Please sign in to comment.