From 3a0f8e4269e33ffab77f475cbba3f1a509802c10 Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Fri, 20 Feb 2015 17:10:59 -0800 Subject: [PATCH] Use simpler settings for column chunk writer --- .../bytes/CapacityByteArrayOutputStream.java | 1 + .../parquet/hadoop/ColumnChunkPageWriteStore.java | 13 +++++++++---- .../parquet/hadoop/InternalParquetRecordWriter.java | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index 9136996b7a..cabfe259f2 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -78,6 +78,7 @@ public CapacityByteArrayOutputStream(int initialSlabSize) { public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) { checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0"); checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0"); + checkArgument(initialSlabSize <= maxCapacityHint, "maxCapacityHint can't be less than initialSlabSize"); this.initialSlabSize = initialSlabSize; this.maxCapacityHint = maxCapacityHint; reset(); diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java index 1a0e42eaee..5d38e4364a 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -41,6 +41,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore { private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class); + private static final int COLUMN_CHUNK_WRITER_MAX_SIZE_HINT = 64 * 1024; private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); @@ -61,10 +62,14 @@ private static final class ColumnChunkPageWriter implements PageWriter { private Statistics totalStatistics; - private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize, int pageSize) { + private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int pageSize) { this.path = path; this.compressor = compressor; - this.buf = new CapacityByteArrayOutputStream(initialSize, pageSize); + + // this writer will write many pages, so we make the initial slab size 1 page size. + // It will then double over time until it reaches COLUMN_CHUNK_WRITER_MAX_SIZE_HINT at + // which point it will grow linearly. + this.buf = new CapacityByteArrayOutputStream(pageSize, COLUMN_CHUNK_WRITER_MAX_SIZE_HINT); this.totalStatistics = getStatsBasedOnType(this.path.getType()); } @@ -202,9 +207,9 @@ public String memUsageString(String prefix) { private final Map writers = new HashMap(); - public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize, int pageSize) { + public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) { for (ColumnDescriptor path : schema.getColumns()) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize, pageSize)); + writers.put(path, new ColumnChunkPageWriter(path, compressor, pageSize)); } } diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java index cd8875d590..973da3d7a8 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java @@ -105,7 +105,7 @@ private void initStore() { // therefore this size is cast to int, since allocating byte array in under layer needs to // limit the array size in an int scope. int initialBlockBufferSize = Ints.checkedCast(max(MINIMUM_BUFFER_SIZE, rowGroupSize / schema.getColumns().size() / 5)); - pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize, pageSize); + pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize); // we don't want this number to be too small either // ideally, slightly bigger than the page size, but not bigger than the block buffer int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));