From b2736a10e89b636d350cd6c8c97a80e04c483f8b Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Fri, 20 Feb 2015 16:37:46 -0800 Subject: [PATCH 01/10] Some cleanup in CapacityByteArrayOutputStream --- .../bytes/CapacityByteArrayOutputStream.java | 154 +++++++++--------- 1 file changed, 81 insertions(+), 73 deletions(-) diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index 3efe9d0e78..9136996b7a 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -29,60 +29,58 @@ import parquet.Log; /** - * functionality of ByteArrayOutputStream without the memory and copy overhead + * Similar to a {@link ByteArrayOutputStream}, but uses a different strategy for growing that does not involve copying. + * Where ByteArrayOutputStream is backed by a single array that "grows" by copying into a new larger array, this output + * stream grows by allocating a new array (slab) and adding it to a list of previous arrays. * - * It will linearly create a new slab of the initial size when needed (instead of creating a new buffer and copying the data). - * After 10 slabs their size will increase exponentially (similar to {@link ByteArrayOutputStream} behavior) by making the new slab size the size of the existing data. + * Each new slab is allocated to be the same size as all the previous slabs combined, so these allocations become + * exponentially less frequent, just like ByteArrayOutputStream, with one difference. This output stream accepts a + * max capacity hint, which is a hint describing the max amount of data that will be written to this stream. As the + * total size of this stream nears this max, this stream starts to grow linearly instead of exponentially. + * So new slabs are allocated to be 1/5th of the max capacity hint, + * instead of equal to the total previous size of all slabs. This is useful because it prevents allocating roughly + * twice the needed space when a new slab is added just before the stream is done being used. * - * When reusing a buffer it will adjust the slab size based on the previous data size ({@link CapacityByteArrayOutputStream#reset()}) + * When reusing a this stream it will adjust the initial slab size based on the previous data size, aiming for fewer + * allocations, with the assumption that a similar amount of data will be written to this stream on re-use. + * See ({@link CapacityByteArrayOutputStream#reset()}). * * @author Julien Le Dem * */ public class CapacityByteArrayOutputStream extends OutputStream { private static final Log LOG = Log.getLog(CapacityByteArrayOutputStream.class); - private static final byte[] EMPTY_SLAB = new byte[0]; - private int initialSize; - private final int pageSize; - private List slabs = new ArrayList(); + private int initialSlabSize; + private final int maxCapacityHint; + private final List slabs = new ArrayList(); + private byte[] currentSlab; - private int capacity = 0; private int currentSlabIndex; - private int currentSlabPosition; - private int size; + private int bytesAllocated = 0; + private int bytesUsed = 0; /** - * defaults pageSize to 1MB - * @param initialSize + * Defaults maxCapacityHint to 1MB + * @param initialSlabSize * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int)} */ @Deprecated - public CapacityByteArrayOutputStream(int initialSize) { - this(initialSize, 1024 * 1024); + public CapacityByteArrayOutputStream(int initialSlabSize) { + this(initialSlabSize, 1024 * 1024); } /** - * @param initialSize the initialSize of the buffer (also slab size) - * @param pageSize + * @param initialSlabSize the size to make the first slab + * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream */ - public CapacityByteArrayOutputStream(int initialSize, int pageSize) { - checkArgument(initialSize > 0, "initialSize must be > 0"); - checkArgument(pageSize > 0, "pageSize must be > 0"); - this.pageSize = pageSize; - initSlabs(initialSize); - } - - private void initSlabs(int initialSize) { - if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSize)); - this.initialSize = initialSize; - this.slabs.clear(); - this.capacity = 0; - this.currentSlab = EMPTY_SLAB; - this.currentSlabIndex = -1; - this.currentSlabPosition = 0; - this.size = 0; + public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) { + checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0"); + checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0"); + this.initialSlabSize = initialSlabSize; + this.maxCapacityHint = maxCapacityHint; + reset(); } /** @@ -90,56 +88,59 @@ private void initSlabs(int initialSize) { * @param minimumSize the size of the data we want to copy in the new slab */ private void addSlab(int minimumSize) { - this.currentSlabIndex += 1; int nextSlabSize; - if (size == 0) { - nextSlabSize = initialSize; - } else if (size > pageSize / 5) { + + if (bytesUsed == 0) { + nextSlabSize = initialSlabSize; + } else if (bytesUsed > maxCapacityHint / 5) { // to avoid an overhead of up to twice the needed size, we get linear when approaching target page size - nextSlabSize = pageSize / 5; + nextSlabSize = maxCapacityHint / 5; } else { // double the size every time - nextSlabSize = size; + nextSlabSize = bytesUsed; } + if (nextSlabSize < minimumSize) { if (Log.DEBUG) LOG.debug(format("slab size %,d too small for value of size %,d. Bumping up slab size", nextSlabSize, minimumSize)); nextSlabSize = minimumSize; } - if (Log.DEBUG) LOG.debug(format("used %d slabs, new slab size %d", currentSlabIndex, nextSlabSize)); + + if (Log.DEBUG) LOG.debug(format("used %d slabs, adding new slab of size %d", slabs.size(), nextSlabSize)); + this.currentSlab = new byte[nextSlabSize]; this.slabs.add(currentSlab); - this.capacity += nextSlabSize; - this.currentSlabPosition = 0; + this.bytesAllocated += nextSlabSize; + this.currentSlabIndex = 0; } @Override public void write(int b) { - if (currentSlabPosition == currentSlab.length) { + if (currentSlabIndex == currentSlab.length) { addSlab(1); } - currentSlab[currentSlabPosition] = (byte) b; - currentSlabPosition += 1; - size += 1; + currentSlab[currentSlabIndex] = (byte) b; + currentSlabIndex += 1; + bytesUsed += 1; } @Override public void write(byte b[], int off, int len) { if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) { - throw new IndexOutOfBoundsException(); + throw new IndexOutOfBoundsException(String.format("len: %d, off: %d", len, off)); } - if (currentSlabPosition + len >= currentSlab.length) { - final int length1 = currentSlab.length - currentSlabPosition; - arraycopy(b, off, currentSlab, currentSlabPosition, length1); + if (currentSlabIndex + len >= currentSlab.length) { + final int length1 = currentSlab.length - currentSlabIndex; + arraycopy(b, off, currentSlab, currentSlabIndex, length1); final int length2 = len - length1; addSlab(length2); - arraycopy(b, off + length1, currentSlab, currentSlabPosition, length2); - currentSlabPosition = length2; + arraycopy(b, off + length1, currentSlab, currentSlabIndex, length2); + currentSlabIndex = length2; } else { - arraycopy(b, off, currentSlab, currentSlabPosition, len); - currentSlabPosition += len; + arraycopy(b, off, currentSlab, currentSlabIndex, len); + currentSlabIndex += len; } - size += len; + bytesUsed += len; } /** @@ -150,18 +151,26 @@ public void write(byte b[], int off, int len) { * @exception IOException if an I/O error occurs. */ public void writeTo(OutputStream out) throws IOException { - for (int i = 0; i < currentSlabIndex; i++) { + for (int i = 0; i < slabs.size() - 1; i++) { final byte[] slab = slabs.get(i); out.write(slab, 0, slab.length); } - out.write(currentSlab, 0, currentSlabPosition); + out.write(currentSlab, 0, currentSlabIndex); } /** - * @return the size of the allocated buffer + * @return The total size in bytes of data written to this stream. + */ + public long size() { + return bytesUsed; + } + + /** + * + * @return The total size in bytes currently allocated for this stream. */ public int getCapacity() { - return capacity; + return bytesAllocated; } /** @@ -172,23 +181,22 @@ public int getCapacity() { public void reset() { // readjust slab size. // 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size - initSlabs(max(size / 7, initialSize)); - } - - /** - * @return the size of the buffered data - */ - public long size() { - return size; + this.initialSlabSize = max(bytesUsed / 7, initialSlabSize); + if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSlabSize)); + this.slabs.clear(); + this.bytesAllocated = 0; + this.bytesUsed = 0; + this.currentSlab = EMPTY_SLAB; + this.currentSlabIndex = 0; } /** - * @return the index of the last value being written to this stream, which + * @return the index of the last value written to this stream, which * can be passed to {@link #setByte(long, byte)} in order to change it */ public long getCurrentIndex() { - checkArgument(size > 0, "This is an empty stream"); - return size - 1; + checkArgument(bytesUsed > 0, "This is an empty stream"); + return bytesUsed - 1; } /** @@ -198,10 +206,10 @@ public long getCurrentIndex() { * @param value the value to replace it with */ public void setByte(long index, byte value) { - checkArgument(index < size, "Index: " + index + " is >= the current size of: " + size); + checkArgument(index < bytesUsed, "Index: " + index + " is >= the current size of: " + bytesUsed); long seen = 0; - for (int i = 0; i <= currentSlabIndex; i++) { + for (int i = 0; i < slabs.size(); i++) { byte[] slab = slabs.get(i); if (index < seen + slab.length) { // ok found index @@ -221,7 +229,7 @@ public String memUsageString(String prefix) { } /** - * @return the total count of allocated slabs + * @return the total number of allocated slabs */ int getSlabCount() { return slabs.size(); From 3a0f8e4269e33ffab77f475cbba3f1a509802c10 Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Fri, 20 Feb 2015 17:10:59 -0800 Subject: [PATCH 02/10] Use simpler settings for column chunk writer --- .../bytes/CapacityByteArrayOutputStream.java | 1 + .../parquet/hadoop/ColumnChunkPageWriteStore.java | 13 +++++++++---- .../parquet/hadoop/InternalParquetRecordWriter.java | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index 9136996b7a..cabfe259f2 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -78,6 +78,7 @@ public CapacityByteArrayOutputStream(int initialSlabSize) { public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) { checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0"); checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0"); + checkArgument(initialSlabSize <= maxCapacityHint, "maxCapacityHint can't be less than initialSlabSize"); this.initialSlabSize = initialSlabSize; this.maxCapacityHint = maxCapacityHint; reset(); diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java index 1a0e42eaee..5d38e4364a 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -41,6 +41,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore { private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class); + private static final int COLUMN_CHUNK_WRITER_MAX_SIZE_HINT = 64 * 1024; private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); @@ -61,10 +62,14 @@ private static final class ColumnChunkPageWriter implements PageWriter { private Statistics totalStatistics; - private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize, int pageSize) { + private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int pageSize) { this.path = path; this.compressor = compressor; - this.buf = new CapacityByteArrayOutputStream(initialSize, pageSize); + + // this writer will write many pages, so we make the initial slab size 1 page size. + // It will then double over time until it reaches COLUMN_CHUNK_WRITER_MAX_SIZE_HINT at + // which point it will grow linearly. + this.buf = new CapacityByteArrayOutputStream(pageSize, COLUMN_CHUNK_WRITER_MAX_SIZE_HINT); this.totalStatistics = getStatsBasedOnType(this.path.getType()); } @@ -202,9 +207,9 @@ public String memUsageString(String prefix) { private final Map writers = new HashMap(); - public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize, int pageSize) { + public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) { for (ColumnDescriptor path : schema.getColumns()) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize, pageSize)); + writers.put(path, new ColumnChunkPageWriter(path, compressor, pageSize)); } } diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java index cd8875d590..973da3d7a8 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java @@ -105,7 +105,7 @@ private void initStore() { // therefore this size is cast to int, since allocating byte array in under layer needs to // limit the array size in an int scope. int initialBlockBufferSize = Ints.checkedCast(max(MINIMUM_BUFFER_SIZE, rowGroupSize / schema.getColumns().size() / 5)); - pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize, pageSize); + pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize); // we don't want this number to be too small either // ideally, slightly bigger than the page size, but not bigger than the block buffer int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); From 6a20e8b2381990d565e15288de32dfec8179bd0f Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Fri, 20 Feb 2015 18:02:20 -0800 Subject: [PATCH 03/10] Remove initialSlabSize decision from InternalParquetRecordReader, use a simpler heuristic in the column writers instead --- .../java/parquet/column/ParquetProperties.java | 9 +++++---- .../parquet/column/impl/ColumnWriteStoreV1.java | 6 ++---- .../parquet/column/impl/ColumnWriteStoreV2.java | 4 ++-- .../parquet/column/impl/ColumnWriterV1.java | 17 +++++++++++++---- .../parquet/column/impl/ColumnWriterV2.java | 16 ++++++++++++---- .../hadoop/InternalParquetRecordWriter.java | 14 +------------- 6 files changed, 35 insertions(+), 31 deletions(-) diff --git a/parquet-column/src/main/java/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/parquet/column/ParquetProperties.java index c083867c09..b32554855c 100644 --- a/parquet-column/src/main/java/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/parquet/column/ParquetProperties.java @@ -202,19 +202,20 @@ public boolean isEnableDictionary() { public ColumnWriteStore newColumnWriteStore( MessageType schema, - PageWriteStore pageStore, int pageSize, - int initialPageBufferSize) { + PageWriteStore pageStore, + int pageSize) { switch (writerVersion) { case PARQUET_1_0: return new ColumnWriteStoreV1( pageStore, - pageSize, initialPageBufferSize, dictionaryPageSizeThreshold, + pageSize, + dictionaryPageSizeThreshold, enableDictionary, writerVersion); case PARQUET_2_0: return new ColumnWriteStoreV2( schema, pageStore, - pageSize, initialPageBufferSize, + pageSize, new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary)); default: throw new IllegalArgumentException("unknown version " + writerVersion); diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java index 884c665570..06bde5839f 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java @@ -36,14 +36,12 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore { private final int pageSizeThreshold; private final int dictionaryPageSizeThreshold; private final boolean enableDictionary; - private final int initialSizePerCol; private final WriterVersion writerVersion; - public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) { + public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) { super(); this.pageWriteStore = pageWriteStore; this.pageSizeThreshold = pageSizeThreshold; - this.initialSizePerCol = initialSizePerCol; this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold; this.enableDictionary = enableDictionary; this.writerVersion = writerVersion; @@ -64,7 +62,7 @@ public Set getColumnDescriptors() { private ColumnWriterV1 newMemColumn(ColumnDescriptor path) { PageWriter pageWriter = pageWriteStore.getPageWriter(path); - return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, initialSizePerCol, dictionaryPageSizeThreshold, enableDictionary, writerVersion); + return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion); } @Override diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java index 03a219d13e..c1046965d8 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java @@ -53,7 +53,7 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore { public ColumnWriteStoreV2( MessageType schema, PageWriteStore pageWriteStore, - int pageSizeThreshold, int initialSizePerCol, + int pageSizeThreshold, ParquetProperties parquetProps) { super(); this.pageSizeThreshold = pageSizeThreshold; @@ -61,7 +61,7 @@ public ColumnWriteStoreV2( Map mcolumns = new TreeMap(); for (ColumnDescriptor path : schema.getColumns()) { PageWriter pageWriter = pageWriteStore.getPageWriter(path); - mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps, pageSizeThreshold)); + mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold)); } this.columns = unmodifiableMap(mcolumns); this.writers = this.columns.values(); diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java index ac3fc19e3c..f3d8181369 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java @@ -31,6 +31,9 @@ import parquet.io.ParquetEncodingException; import parquet.io.api.Binary; +import static java.lang.Math.max; +import static java.lang.Math.pow; + /** * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer. * @@ -41,6 +44,7 @@ final class ColumnWriterV1 implements ColumnWriter { private static final Log LOG = Log.getLog(ColumnWriterV1.class); private static final boolean DEBUG = Log.DEBUG; private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100; + private static final int MIN_SLAB_SIZE = 64; private final ColumnDescriptor path; private final PageWriter pageWriter; @@ -57,7 +61,6 @@ public ColumnWriterV1( ColumnDescriptor path, PageWriter pageWriter, int pageSizeThreshold, - int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) { @@ -69,9 +72,15 @@ public ColumnWriterV1( resetStatistics(); ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary); - this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol, pageSizeThreshold); - this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol, pageSizeThreshold); - this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSizeThreshold); + + this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold); + this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold); + + // initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize + // eg for page size of 1MB we start at 1024 bytes. + // we also don't want to start too small, so we also apply a minimum. + int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSizeThreshold / pow(2, 10)))); + this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold); } private void log(Object value, int r, int d) { diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java index 100bca28ae..e29a786cbc 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java @@ -15,6 +15,8 @@ */ package parquet.column.impl; +import static java.lang.Math.max; +import static java.lang.Math.pow; import static parquet.bytes.BytesUtils.getWidthFromMaxInt; import java.io.IOException; @@ -43,6 +45,7 @@ final class ColumnWriterV2 implements ColumnWriter { private static final Log LOG = Log.getLog(ColumnWriterV2.class); private static final boolean DEBUG = Log.DEBUG; + private static final int MIN_SLAB_SIZE = 64; private final ColumnDescriptor path; private final PageWriter pageWriter; @@ -57,15 +60,20 @@ final class ColumnWriterV2 implements ColumnWriter { public ColumnWriterV2( ColumnDescriptor path, PageWriter pageWriter, - int initialSizePerCol, ParquetProperties parquetProps, int pageSize) { this.path = path; this.pageWriter = pageWriter; resetStatistics(); - this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol, pageSize); - this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol, pageSize); - this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol, pageSize); + + this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize); + this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize); + + // initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize + // eg for page size of 1MB we start at 1024 bytes. + // we also don't want to start too small, so we also apply a minimum. + int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSize / pow(2, 10)))); + this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize); } private void log(Object value, int r, int d) { diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java index 973da3d7a8..1bfff8ef17 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java @@ -42,7 +42,6 @@ class InternalParquetRecordWriter { private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class); - private static final int MINIMUM_BUFFER_SIZE = 64 * 1024; private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; @@ -98,22 +97,11 @@ public InternalParquetRecordWriter( } private void initStore() { - // we don't want this number to be too small - // ideally we divide the block equally across the columns - // it is unlikely all columns are going to be the same size. - // its value is likely below Integer.MAX_VALUE (2GB), although rowGroupSize is a long type. - // therefore this size is cast to int, since allocating byte array in under layer needs to - // limit the array size in an int scope. - int initialBlockBufferSize = Ints.checkedCast(max(MINIMUM_BUFFER_SIZE, rowGroupSize / schema.getColumns().size() / 5)); pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize); - // we don't want this number to be too small either - // ideally, slightly bigger than the page size, but not bigger than the block buffer - int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); columnStore = parquetProperties.newColumnWriteStore( schema, pageStore, - pageSize, - initialPageBufferSize); + pageSize); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore)); } From 8b54667650873c03ea66721d0f06bfad0b968f19 Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Fri, 20 Feb 2015 20:31:00 -0800 Subject: [PATCH 04/10] Don't use CapacityByteArrayOutputStream for writing page chunks --- .../dictionary/DictionaryValuesWriter.java | 2 +- .../column/impl/TestColumnReaderImpl.java | 4 +- .../parquet/column/mem/TestMemColumn.java | 2 +- .../src/test/java/parquet/io/PerfTest.java | 2 +- .../test/java/parquet/io/TestColumnIO.java | 2 +- .../test/java/parquet/io/TestFiltered.java | 2 +- .../bytes/CapacityByteArrayOutputStream.java | 4 +- .../ConcatenatingByteArrayCollector.java | 48 +++++++++++++++++++ .../hadoop/ColumnChunkPageWriteStore.java | 32 ++++++++----- .../hadoop/TestColumnChunkPageWriteStore.java | 2 +- .../parquet/pig/TupleConsumerPerfTest.java | 2 +- .../thrift/TestParquetReadProtocol.java | 2 +- 12 files changed, 81 insertions(+), 23 deletions(-) create mode 100644 parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java index 9488a4c8ce..25718117c0 100644 --- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java @@ -143,7 +143,7 @@ public BytesInput getBytes() { if (DEBUG) LOG.debug("max dic id " + maxDicId); int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId); // TODO: what is a good initialCapacity? - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024, maxDictionaryByteSize); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64, maxDictionaryByteSize); IntIterator iterator = encodedValues.iterator(); try { while (iterator.hasNext()) { diff --git a/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java index bcff4761e2..dda8187e66 100644 --- a/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java +++ b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java @@ -38,7 +38,7 @@ public void test() { MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }"); ColumnDescriptor col = schema.getColumns().get(0); MemPageWriter pageWriter = new MemPageWriter(); - ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true), 2048); + ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048); for (int i = 0; i < rows; i++) { columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0); if ((i + 1) % 1000 == 0) { @@ -73,7 +73,7 @@ public void testOptional() { MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }"); ColumnDescriptor col = schema.getColumns().get(0); MemPageWriter pageWriter = new MemPageWriter(); - ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, 1024, new ParquetProperties(1024, PARQUET_2_0, true), 2048); + ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048); for (int i = 0; i < rows; i++) { columnWriterV2.writeNull(0, 0); if ((i + 1) % 1000 == 0) { diff --git a/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java index a386bbba92..b0abf55fa2 100644 --- a/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java +++ b/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java @@ -156,6 +156,6 @@ public void testMemColumnSeveralPagesRepeated() throws Exception { } private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) { - return new ColumnWriteStoreV1(memPageStore, 2048, 2048, 2048, false, WriterVersion.PARQUET_1_0); + return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0); } } diff --git a/parquet-column/src/test/java/parquet/io/PerfTest.java b/parquet-column/src/test/java/parquet/io/PerfTest.java index 9cd31e3097..2642e09d15 100644 --- a/parquet-column/src/test/java/parquet/io/PerfTest.java +++ b/parquet-column/src/test/java/parquet/io/PerfTest.java @@ -74,7 +74,7 @@ private static void read(MemPageStore memPageStore, MessageType myschema, private static void write(MemPageStore memPageStore) { - ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); + ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); MessageColumnIO columnIO = newColumnFactory(schema); GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema); diff --git a/parquet-column/src/test/java/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/parquet/io/TestColumnIO.java index d4442df6a8..bf93c6eb17 100644 --- a/parquet-column/src/test/java/parquet/io/TestColumnIO.java +++ b/parquet-column/src/test/java/parquet/io/TestColumnIO.java @@ -514,7 +514,7 @@ public void testPushParser() { } private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) { - return new ColumnWriteStoreV1(memPageStore, 800, 800, 800, useDictionary, WriterVersion.PARQUET_1_0); + return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0); } @Test diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java index 7acf6f1e69..2ba9c19e84 100644 --- a/parquet-column/src/test/java/parquet/io/TestFiltered.java +++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java @@ -254,7 +254,7 @@ public void testFilteredNotPaged() { private MemPageStore writeTestRecords(MessageColumnIO columnIO, int number) { MemPageStore memPageStore = new MemPageStore(number * 2); - ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, 800, false, WriterVersion.PARQUET_1_0); + ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0); GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema); for ( int i = 0; i < number; i++ ) { diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index cabfe259f2..1fe4204565 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -78,7 +78,7 @@ public CapacityByteArrayOutputStream(int initialSlabSize) { public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) { checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0"); checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0"); - checkArgument(initialSlabSize <= maxCapacityHint, "maxCapacityHint can't be less than initialSlabSize"); + checkArgument(maxCapacityHint >= initialSlabSize, String.format("maxCapacityHint can't be less than initialSlabSize %d %d", initialSlabSize, maxCapacityHint)); this.initialSlabSize = initialSlabSize; this.maxCapacityHint = maxCapacityHint; reset(); @@ -154,7 +154,7 @@ public void write(byte b[], int off, int len) { public void writeTo(OutputStream out) throws IOException { for (int i = 0; i < slabs.size() - 1; i++) { final byte[] slab = slabs.get(i); - out.write(slab, 0, slab.length); + out.write(slab); } out.write(currentSlab, 0, currentSlabIndex); } diff --git a/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java b/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java new file mode 100644 index 0000000000..ec4f4d38d2 --- /dev/null +++ b/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java @@ -0,0 +1,48 @@ +package parquet.bytes; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import static java.lang.String.format; + +public class ConcatenatingByteArrayCollector extends BytesInput { + private final List slabs = new ArrayList(); + private long size = 0; + + public void collect(BytesInput bytes) throws IOException { + collect(bytes.toByteArray()); + } + + public void collect(byte[] bytes) { + slabs.add(bytes); + size += bytes.length; + } + + public void reset() { + size = 0; + slabs.clear(); + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + for (byte[] slab : slabs) { + out.write(slab); + } + } + + @Override + public long size() { + return size; + } + + /** + * @param prefix a prefix to be used for every new line in the string + * @return a text representation of the memory usage of this structure + */ + public String memUsageString(String prefix) { + return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), size); + } + +} diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java index 5d38e4364a..e5f9df0603 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -18,6 +18,7 @@ import static parquet.Log.INFO; import static parquet.column.statistics.Statistics.getStatsBasedOnType; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -28,6 +29,7 @@ import parquet.Log; import parquet.bytes.BytesInput; import parquet.bytes.CapacityByteArrayOutputStream; +import parquet.bytes.ConcatenatingByteArrayCollector; import parquet.column.ColumnDescriptor; import parquet.column.Encoding; import parquet.column.page.DictionaryPage; @@ -50,7 +52,8 @@ private static final class ColumnChunkPageWriter implements PageWriter { private final ColumnDescriptor path; private final BytesCompressor compressor; - private final CapacityByteArrayOutputStream buf; + private final ByteArrayOutputStream tempOutputStream = new ByteArrayOutputStream(); + private final ConcatenatingByteArrayCollector buf; private DictionaryPage dictionaryPage; private long uncompressedLength; @@ -69,7 +72,7 @@ private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, // this writer will write many pages, so we make the initial slab size 1 page size. // It will then double over time until it reaches COLUMN_CHUNK_WRITER_MAX_SIZE_HINT at // which point it will grow linearly. - this.buf = new CapacityByteArrayOutputStream(pageSize, COLUMN_CHUNK_WRITER_MAX_SIZE_HINT); + this.buf = new ConcatenatingByteArrayCollector(); this.totalStatistics = getStatsBasedOnType(this.path.getType()); } @@ -93,6 +96,7 @@ public void writePage(BytesInput bytes, "Cannot write compressed page larger than Integer.MAX_VALUE bytes: " + compressedSize); } + tempOutputStream.reset(); parquetMetadataConverter.writeDataPageHeader( (int)uncompressedSize, (int)compressedSize, @@ -101,13 +105,15 @@ public void writePage(BytesInput bytes, rlEncoding, dlEncoding, valuesEncoding, - buf); + tempOutputStream); + buf.collect(tempOutputStream.toByteArray()); + tempOutputStream.reset(); this.uncompressedLength += uncompressedSize; this.compressedLength += compressedSize; this.totalValueCount += valueCount; this.pageCount += 1; this.totalStatistics.mergeStatistics(statistics); - compressedBytes.writeAllTo(buf); + buf.collect(compressedBytes); encodings.add(rlEncoding); encodings.add(dlEncoding); encodings.add(valuesEncoding); @@ -129,21 +135,25 @@ public void writePageV2( int compressedSize = toIntWithCheck( compressedData.size() + repetitionLevels.size() + definitionLevels.size() ); + tempOutputStream.reset(); parquetMetadataConverter.writeDataPageV2Header( uncompressedSize, compressedSize, valueCount, nullCount, rowCount, statistics, dataEncoding, - rlByteLength, dlByteLength, - buf); + rlByteLength, + dlByteLength, + tempOutputStream); + buf.collect(tempOutputStream.toByteArray()); + tempOutputStream.reset(); this.uncompressedLength += uncompressedSize; this.compressedLength += compressedSize; this.totalValueCount += valueCount; this.pageCount += 1; this.totalStatistics.mergeStatistics(statistics); - repetitionLevels.writeAllTo(buf); - definitionLevels.writeAllTo(buf); - compressedData.writeAllTo(buf); + buf.collect(repetitionLevels); + buf.collect(definitionLevels); + buf.collect(compressedData); encodings.add(dataEncoding); } @@ -167,7 +177,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException { writer.writeDictionaryPage(dictionaryPage); encodings.add(dictionaryPage.getEncoding()); } - writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList(encodings)); + writer.writeDataPages(buf, uncompressedLength, compressedLength, totalStatistics, new ArrayList(encodings)); writer.endColumn(); if (INFO) { LOG.info( @@ -185,7 +195,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException { @Override public long allocatedSize() { - return buf.getCapacity(); + return buf.size(); } @Override diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java index e1223b666c..60337cddc8 100644 --- a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -64,7 +64,7 @@ public void test() throws Exception { writer.start(); writer.startBlock(rowCount); { - ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(f.getCompressor(codec, pageSize ), schema , initialSize, pageSize); + ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(f.getCompressor(codec, pageSize ), schema, pageSize); PageWriter pageWriter = store.getPageWriter(col); pageWriter.writePageV2( rowCount, nullCount, valueCount, diff --git a/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java b/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java index 68ad1fed3e..9e590d855a 100644 --- a/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java +++ b/parquet-pig/src/test/java/parquet/pig/TupleConsumerPerfTest.java @@ -56,7 +56,7 @@ public static void main(String[] args) throws Exception { MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchema)); MemPageStore memPageStore = new MemPageStore(0); - ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); + ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); write(memPageStore, columns, schema, pigSchema); columns.flush(); read(memPageStore, pigSchema, pigSchemaProjected, pigSchemaNoString); diff --git a/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java b/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java index eb2041250d..43e1a884f2 100644 --- a/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java +++ b/parquet-thrift/src/test/java/parquet/thrift/TestParquetReadProtocol.java @@ -145,7 +145,7 @@ private > void validate(T expected) throws TException { final MessageType schema = schemaConverter.convert(thriftClass); LOG.info(schema); final MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); - final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, 10000, false, WriterVersion.PARQUET_1_0); + final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, false, WriterVersion.PARQUET_1_0); final RecordConsumer recordWriter = columnIO.getRecordWriter(columns); final StructType thriftType = schemaConverter.toStructType(thriftClass); ParquetWriteProtocol parquetWriteProtocol = new ParquetWriteProtocol(recordWriter, columnIO, thriftType); From 64d6c7fb15c1729698a93ed0ae24157edced64f3 Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Fri, 20 Feb 2015 20:46:59 -0800 Subject: [PATCH 05/10] update comments --- .../main/java/parquet/hadoop/ColumnChunkPageWriteStore.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java index e5f9df0603..e5806daaea 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -68,10 +68,6 @@ private static final class ColumnChunkPageWriter implements PageWriter { private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int pageSize) { this.path = path; this.compressor = compressor; - - // this writer will write many pages, so we make the initial slab size 1 page size. - // It will then double over time until it reaches COLUMN_CHUNK_WRITER_MAX_SIZE_HINT at - // which point it will grow linearly. this.buf = new ConcatenatingByteArrayCollector(); this.totalStatistics = getStatsBasedOnType(this.path.getType()); } From a257ee409925f28b6e2320c9c17c214c29f83749 Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Mon, 23 Feb 2015 13:37:37 -0800 Subject: [PATCH 06/10] Improve IndexOutOfBoundsException message --- .../main/java/parquet/bytes/CapacityByteArrayOutputStream.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index 1fe4204565..b0744fca91 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -128,7 +128,8 @@ public void write(int b) { public void write(byte b[], int off, int len) { if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) { - throw new IndexOutOfBoundsException(String.format("len: %d, off: %d", len, off)); + throw new IndexOutOfBoundsException( + String.format("Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off)); } if (currentSlabIndex + len >= currentSlab.length) { final int length1 = currentSlab.length - currentSlabIndex; From 61c01002800a1160ef5b3979659b46a8a473a711 Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Mon, 23 Feb 2015 13:58:02 -0800 Subject: [PATCH 07/10] Make initial slab size heuristic into a helper method, apply in DictionaryValuesWriter as well --- .../parquet/column/impl/ColumnWriterV1.java | 6 ++-- .../parquet/column/impl/ColumnWriterV2.java | 6 ++-- .../dictionary/DictionaryValuesWriter.java | 10 ++++-- .../bytes/CapacityByteArrayOutputStream.java | 34 +++++++++++++++++++ 4 files changed, 46 insertions(+), 10 deletions(-) diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java index f3d8181369..fdca2f5502 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java @@ -20,6 +20,7 @@ import java.io.IOException; import parquet.Log; +import parquet.bytes.CapacityByteArrayOutputStream; import parquet.column.ColumnDescriptor; import parquet.column.ColumnWriter; import parquet.column.ParquetProperties; @@ -76,10 +77,7 @@ public ColumnWriterV1( this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold); this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold); - // initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize - // eg for page size of 1MB we start at 1024 bytes. - // we also don't want to start too small, so we also apply a minimum. - int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSizeThreshold / pow(2, 10)))); + int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10); this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold); } diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java index e29a786cbc..df1075a499 100644 --- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java +++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java @@ -24,6 +24,7 @@ import parquet.Ints; import parquet.Log; import parquet.bytes.BytesInput; +import parquet.bytes.CapacityByteArrayOutputStream; import parquet.column.ColumnDescriptor; import parquet.column.ColumnWriter; import parquet.column.Encoding; @@ -69,10 +70,7 @@ public ColumnWriterV2( this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize); this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize); - // initialSlabSize = (pageSize / (2^10)) means we double 10 times before reaching the pageSize - // eg for page size of 1MB we start at 1024 bytes. - // we also don't want to start too small, so we also apply a minimum. - int initialSlabSize = max(MIN_SLAB_SIZE, ((int) (pageSize / pow(2, 10)))); + int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10); this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize); } diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java index 25718117c0..8fca4fd613 100644 --- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java +++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java @@ -39,6 +39,7 @@ import parquet.Log; import parquet.bytes.BytesInput; import parquet.bytes.BytesUtils; +import parquet.bytes.CapacityByteArrayOutputStream; import parquet.column.Encoding; import parquet.column.page.DictionaryPage; import parquet.column.values.RequiresFallback; @@ -62,6 +63,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req /* max entries allowed for the dictionary will fail over to plain encoding if reached */ private static final int MAX_DICTIONARY_ENTRIES = Integer.MAX_VALUE - 1; + private static final int MIN_INITIAL_SLAB_SIZE = 64; /* encoding to label the data page */ private final Encoding encodingForDataPage; @@ -142,8 +144,12 @@ public BytesInput getBytes() { int maxDicId = getDictionarySize() - 1; if (DEBUG) LOG.debug("max dic id " + maxDicId); int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId); - // TODO: what is a good initialCapacity? - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64, maxDictionaryByteSize); + + int initialSlabSize = + CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10); + + RunLengthBitPackingHybridEncoder encoder = + new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize); IntIterator iterator = encodedValues.iterator(); try { while (iterator.hasNext()) { diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index b0744fca91..70100efb0a 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -16,6 +16,7 @@ package parquet.bytes; import static java.lang.Math.max; +import static java.lang.Math.pow; import static java.lang.String.format; import static java.lang.System.arraycopy; import static parquet.Preconditions.checkArgument; @@ -61,6 +62,39 @@ public class CapacityByteArrayOutputStream extends OutputStream { private int bytesAllocated = 0; private int bytesUsed = 0; + /** + * Return an initial slab size such that a CapacityByteArrayOutputStream constructed with it + * will end up allocating targetNumSlabs in order to reach targetCapacity. This aims to be + * a balance between the overhead of creating new slabs and wasting memory by eagerly making + * initial slabs too big. + * + * Note that targetCapacity here need not match maxCapacityHint in the constructor of + * CapacityByteArrayOutputStream, though often that would make sense. + * + * @param minSlabSize no matter what we shouldn't make slabs any smaller than this + * @param targetCapacity after we've allocated targetNumSlabs how much capacity should we have? + * @param targetNumSlabs how many slabs should it take to reach targetTotalSize? + */ + public static int initialSlabSizeHeuristic(int minSlabSize, int targetCapacity, int targetNumSlabs) { + // initialSlabSize = (targetCapacity / (2^targetNumSlabs)) means we double targetNumSlabs times + // before reaching the targetCapacity + // eg for page size of 1MB we start at 1024 bytes. + // we also don't want to start too small, so we also apply a minimum. + return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs)))); + } + + /** + * Construct a CapacityByteArrayOutputStream configured such that it's initial slab size is + * determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint + */ + public static CapacityByteArrayOutputStream withTargetNumSlabs( + int minSlabSize, int maxCapacityHint, int targetNumSlabs) { + + return new CapacityByteArrayOutputStream( + initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs), + maxCapacityHint); + } + /** * Defaults maxCapacityHint to 1MB * @param initialSlabSize From 9939d8dfabfb770abc57c887097f1939572b9ebb Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Mon, 23 Feb 2015 14:02:19 -0800 Subject: [PATCH 08/10] fix typos in comments --- .../java/parquet/bytes/CapacityByteArrayOutputStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index 70100efb0a..c02581271c 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -42,7 +42,7 @@ * instead of equal to the total previous size of all slabs. This is useful because it prevents allocating roughly * twice the needed space when a new slab is added just before the stream is done being used. * - * When reusing a this stream it will adjust the initial slab size based on the previous data size, aiming for fewer + * When reusing this stream it will adjust the initial slab size based on the previous data size, aiming for fewer * allocations, with the assumption that a similar amount of data will be written to this stream on re-use. * See ({@link CapacityByteArrayOutputStream#reset()}). * @@ -73,7 +73,7 @@ public class CapacityByteArrayOutputStream extends OutputStream { * * @param minSlabSize no matter what we shouldn't make slabs any smaller than this * @param targetCapacity after we've allocated targetNumSlabs how much capacity should we have? - * @param targetNumSlabs how many slabs should it take to reach targetTotalSize? + * @param targetNumSlabs how many slabs should it take to reach targetCapacity? */ public static int initialSlabSizeHeuristic(int minSlabSize, int targetCapacity, int targetNumSlabs) { // initialSlabSize = (targetCapacity / (2^targetNumSlabs)) means we double targetNumSlabs times From 965af7f0b665087d2fc0a18707d8c5a60375bc51 Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Mon, 23 Feb 2015 14:02:45 -0800 Subject: [PATCH 09/10] one more typo --- .../main/java/parquet/bytes/CapacityByteArrayOutputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index c02581271c..eaa068902a 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -84,7 +84,7 @@ public static int initialSlabSizeHeuristic(int minSlabSize, int targetCapacity, } /** - * Construct a CapacityByteArrayOutputStream configured such that it's initial slab size is + * Construct a CapacityByteArrayOutputStream configured such that its initial slab size is * determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint */ public static CapacityByteArrayOutputStream withTargetNumSlabs( From b9abab0c7a8a491b5d9d83a78970d7ecc037259c Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Fri, 27 Feb 2015 13:17:43 -0800 Subject: [PATCH 10/10] Address Julien's comment --- .../ConcatenatingByteArrayCollector.java | 7 ++---- .../hadoop/ColumnChunkPageWriteStore.java | 22 +++++++++++-------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java b/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java index ec4f4d38d2..9ea8296589 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java +++ b/parquet-encoding/src/main/java/parquet/bytes/ConcatenatingByteArrayCollector.java @@ -11,11 +11,8 @@ public class ConcatenatingByteArrayCollector extends BytesInput { private final List slabs = new ArrayList(); private long size = 0; - public void collect(BytesInput bytes) throws IOException { - collect(bytes.toByteArray()); - } - - public void collect(byte[] bytes) { + public void collect(BytesInput bytesInput) throws IOException { + byte[] bytes = bytesInput.toByteArray(); slabs.add(bytes); size += bytes.length; } diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java index e5806daaea..f17b8d3204 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -28,7 +28,6 @@ import parquet.Log; import parquet.bytes.BytesInput; -import parquet.bytes.CapacityByteArrayOutputStream; import parquet.bytes.ConcatenatingByteArrayCollector; import parquet.column.ColumnDescriptor; import parquet.column.Encoding; @@ -102,14 +101,14 @@ public void writePage(BytesInput bytes, dlEncoding, valuesEncoding, tempOutputStream); - buf.collect(tempOutputStream.toByteArray()); - tempOutputStream.reset(); this.uncompressedLength += uncompressedSize; this.compressedLength += compressedSize; this.totalValueCount += valueCount; this.pageCount += 1; this.totalStatistics.mergeStatistics(statistics); - buf.collect(compressedBytes); + // by concatenating before collecting instead of collecting twice, + // we only allocate one buffer to copy into instead of multiple. + buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes)); encodings.add(rlEncoding); encodings.add(dlEncoding); encodings.add(valuesEncoding); @@ -140,16 +139,21 @@ public void writePageV2( rlByteLength, dlByteLength, tempOutputStream); - buf.collect(tempOutputStream.toByteArray()); - tempOutputStream.reset(); this.uncompressedLength += uncompressedSize; this.compressedLength += compressedSize; this.totalValueCount += valueCount; this.pageCount += 1; this.totalStatistics.mergeStatistics(statistics); - buf.collect(repetitionLevels); - buf.collect(definitionLevels); - buf.collect(compressedData); + + // by concatenating before collecting instead of collecting twice, + // we only allocate one buffer to copy into instead of multiple. + buf.collect( + BytesInput.concat( + BytesInput.from(tempOutputStream), + repetitionLevels, + definitionLevels, + compressedData) + ); encodings.add(dataEncoding); }