diff --git a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java index 3efe9d0e78..9136996b7a 100644 --- a/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/parquet/bytes/CapacityByteArrayOutputStream.java @@ -29,60 +29,58 @@ import parquet.Log; /** - * functionality of ByteArrayOutputStream without the memory and copy overhead + * Similar to a {@link ByteArrayOutputStream}, but uses a different strategy for growing that does not involve copying. + * Where ByteArrayOutputStream is backed by a single array that "grows" by copying into a new larger array, this output + * stream grows by allocating a new array (slab) and adding it to a list of previous arrays. * - * It will linearly create a new slab of the initial size when needed (instead of creating a new buffer and copying the data). - * After 10 slabs their size will increase exponentially (similar to {@link ByteArrayOutputStream} behavior) by making the new slab size the size of the existing data. + * Each new slab is allocated to be the same size as all the previous slabs combined, so these allocations become + * exponentially less frequent, just like ByteArrayOutputStream, with one difference. This output stream accepts a + * max capacity hint, which is a hint describing the max amount of data that will be written to this stream. As the + * total size of this stream nears this max, this stream starts to grow linearly instead of exponentially. + * So new slabs are allocated to be 1/5th of the max capacity hint, + * instead of equal to the total previous size of all slabs. This is useful because it prevents allocating roughly + * twice the needed space when a new slab is added just before the stream is done being used. * - * When reusing a buffer it will adjust the slab size based on the previous data size ({@link CapacityByteArrayOutputStream#reset()}) + * When reusing a this stream it will adjust the initial slab size based on the previous data size, aiming for fewer + * allocations, with the assumption that a similar amount of data will be written to this stream on re-use. + * See ({@link CapacityByteArrayOutputStream#reset()}). * * @author Julien Le Dem * */ public class CapacityByteArrayOutputStream extends OutputStream { private static final Log LOG = Log.getLog(CapacityByteArrayOutputStream.class); - private static final byte[] EMPTY_SLAB = new byte[0]; - private int initialSize; - private final int pageSize; - private List slabs = new ArrayList(); + private int initialSlabSize; + private final int maxCapacityHint; + private final List slabs = new ArrayList(); + private byte[] currentSlab; - private int capacity = 0; private int currentSlabIndex; - private int currentSlabPosition; - private int size; + private int bytesAllocated = 0; + private int bytesUsed = 0; /** - * defaults pageSize to 1MB - * @param initialSize + * Defaults maxCapacityHint to 1MB + * @param initialSlabSize * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int)} */ @Deprecated - public CapacityByteArrayOutputStream(int initialSize) { - this(initialSize, 1024 * 1024); + public CapacityByteArrayOutputStream(int initialSlabSize) { + this(initialSlabSize, 1024 * 1024); } /** - * @param initialSize the initialSize of the buffer (also slab size) - * @param pageSize + * @param initialSlabSize the size to make the first slab + * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream */ - public CapacityByteArrayOutputStream(int initialSize, int pageSize) { - checkArgument(initialSize > 0, "initialSize must be > 0"); - checkArgument(pageSize > 0, "pageSize must be > 0"); - this.pageSize = pageSize; - initSlabs(initialSize); - } - - private void initSlabs(int initialSize) { - if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSize)); - this.initialSize = initialSize; - this.slabs.clear(); - this.capacity = 0; - this.currentSlab = EMPTY_SLAB; - this.currentSlabIndex = -1; - this.currentSlabPosition = 0; - this.size = 0; + public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) { + checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0"); + checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0"); + this.initialSlabSize = initialSlabSize; + this.maxCapacityHint = maxCapacityHint; + reset(); } /** @@ -90,56 +88,59 @@ private void initSlabs(int initialSize) { * @param minimumSize the size of the data we want to copy in the new slab */ private void addSlab(int minimumSize) { - this.currentSlabIndex += 1; int nextSlabSize; - if (size == 0) { - nextSlabSize = initialSize; - } else if (size > pageSize / 5) { + + if (bytesUsed == 0) { + nextSlabSize = initialSlabSize; + } else if (bytesUsed > maxCapacityHint / 5) { // to avoid an overhead of up to twice the needed size, we get linear when approaching target page size - nextSlabSize = pageSize / 5; + nextSlabSize = maxCapacityHint / 5; } else { // double the size every time - nextSlabSize = size; + nextSlabSize = bytesUsed; } + if (nextSlabSize < minimumSize) { if (Log.DEBUG) LOG.debug(format("slab size %,d too small for value of size %,d. Bumping up slab size", nextSlabSize, minimumSize)); nextSlabSize = minimumSize; } - if (Log.DEBUG) LOG.debug(format("used %d slabs, new slab size %d", currentSlabIndex, nextSlabSize)); + + if (Log.DEBUG) LOG.debug(format("used %d slabs, adding new slab of size %d", slabs.size(), nextSlabSize)); + this.currentSlab = new byte[nextSlabSize]; this.slabs.add(currentSlab); - this.capacity += nextSlabSize; - this.currentSlabPosition = 0; + this.bytesAllocated += nextSlabSize; + this.currentSlabIndex = 0; } @Override public void write(int b) { - if (currentSlabPosition == currentSlab.length) { + if (currentSlabIndex == currentSlab.length) { addSlab(1); } - currentSlab[currentSlabPosition] = (byte) b; - currentSlabPosition += 1; - size += 1; + currentSlab[currentSlabIndex] = (byte) b; + currentSlabIndex += 1; + bytesUsed += 1; } @Override public void write(byte b[], int off, int len) { if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) { - throw new IndexOutOfBoundsException(); + throw new IndexOutOfBoundsException(String.format("len: %d, off: %d", len, off)); } - if (currentSlabPosition + len >= currentSlab.length) { - final int length1 = currentSlab.length - currentSlabPosition; - arraycopy(b, off, currentSlab, currentSlabPosition, length1); + if (currentSlabIndex + len >= currentSlab.length) { + final int length1 = currentSlab.length - currentSlabIndex; + arraycopy(b, off, currentSlab, currentSlabIndex, length1); final int length2 = len - length1; addSlab(length2); - arraycopy(b, off + length1, currentSlab, currentSlabPosition, length2); - currentSlabPosition = length2; + arraycopy(b, off + length1, currentSlab, currentSlabIndex, length2); + currentSlabIndex = length2; } else { - arraycopy(b, off, currentSlab, currentSlabPosition, len); - currentSlabPosition += len; + arraycopy(b, off, currentSlab, currentSlabIndex, len); + currentSlabIndex += len; } - size += len; + bytesUsed += len; } /** @@ -150,18 +151,26 @@ public void write(byte b[], int off, int len) { * @exception IOException if an I/O error occurs. */ public void writeTo(OutputStream out) throws IOException { - for (int i = 0; i < currentSlabIndex; i++) { + for (int i = 0; i < slabs.size() - 1; i++) { final byte[] slab = slabs.get(i); out.write(slab, 0, slab.length); } - out.write(currentSlab, 0, currentSlabPosition); + out.write(currentSlab, 0, currentSlabIndex); } /** - * @return the size of the allocated buffer + * @return The total size in bytes of data written to this stream. + */ + public long size() { + return bytesUsed; + } + + /** + * + * @return The total size in bytes currently allocated for this stream. */ public int getCapacity() { - return capacity; + return bytesAllocated; } /** @@ -172,23 +181,22 @@ public int getCapacity() { public void reset() { // readjust slab size. // 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size - initSlabs(max(size / 7, initialSize)); - } - - /** - * @return the size of the buffered data - */ - public long size() { - return size; + this.initialSlabSize = max(bytesUsed / 7, initialSlabSize); + if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSlabSize)); + this.slabs.clear(); + this.bytesAllocated = 0; + this.bytesUsed = 0; + this.currentSlab = EMPTY_SLAB; + this.currentSlabIndex = 0; } /** - * @return the index of the last value being written to this stream, which + * @return the index of the last value written to this stream, which * can be passed to {@link #setByte(long, byte)} in order to change it */ public long getCurrentIndex() { - checkArgument(size > 0, "This is an empty stream"); - return size - 1; + checkArgument(bytesUsed > 0, "This is an empty stream"); + return bytesUsed - 1; } /** @@ -198,10 +206,10 @@ public long getCurrentIndex() { * @param value the value to replace it with */ public void setByte(long index, byte value) { - checkArgument(index < size, "Index: " + index + " is >= the current size of: " + size); + checkArgument(index < bytesUsed, "Index: " + index + " is >= the current size of: " + bytesUsed); long seen = 0; - for (int i = 0; i <= currentSlabIndex; i++) { + for (int i = 0; i < slabs.size(); i++) { byte[] slab = slabs.get(i); if (index < seen + slab.length) { // ok found index @@ -221,7 +229,7 @@ public String memUsageString(String prefix) { } /** - * @return the total count of allocated slabs + * @return the total number of allocated slabs */ int getSlabCount() { return slabs.size();