Skip to content

Commit

Permalink
Some cleanup in CapacityByteArrayOutputStream
Browse files Browse the repository at this point in the history
  • Loading branch information
isnotinvain committed Feb 21, 2015
1 parent 1df4a71 commit b2736a1
Showing 1 changed file with 81 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,117 +29,118 @@
import parquet.Log;

/**
* functionality of ByteArrayOutputStream without the memory and copy overhead
* Similar to a {@link ByteArrayOutputStream}, but uses a different strategy for growing that does not involve copying.
* Where ByteArrayOutputStream is backed by a single array that "grows" by copying into a new larger array, this output
* stream grows by allocating a new array (slab) and adding it to a list of previous arrays.
*
* It will linearly create a new slab of the initial size when needed (instead of creating a new buffer and copying the data).
* After 10 slabs their size will increase exponentially (similar to {@link ByteArrayOutputStream} behavior) by making the new slab size the size of the existing data.
* Each new slab is allocated to be the same size as all the previous slabs combined, so these allocations become
* exponentially less frequent, just like ByteArrayOutputStream, with one difference. This output stream accepts a
* max capacity hint, which is a hint describing the max amount of data that will be written to this stream. As the
* total size of this stream nears this max, this stream starts to grow linearly instead of exponentially.
* So new slabs are allocated to be 1/5th of the max capacity hint,
* instead of equal to the total previous size of all slabs. This is useful because it prevents allocating roughly
* twice the needed space when a new slab is added just before the stream is done being used.
*
* When reusing a buffer it will adjust the slab size based on the previous data size ({@link CapacityByteArrayOutputStream#reset()})
* When reusing a this stream it will adjust the initial slab size based on the previous data size, aiming for fewer
* allocations, with the assumption that a similar amount of data will be written to this stream on re-use.
* See ({@link CapacityByteArrayOutputStream#reset()}).
*
* @author Julien Le Dem
*
*/
public class CapacityByteArrayOutputStream extends OutputStream {
private static final Log LOG = Log.getLog(CapacityByteArrayOutputStream.class);

private static final byte[] EMPTY_SLAB = new byte[0];

private int initialSize;
private final int pageSize;
private List<byte[]> slabs = new ArrayList<byte[]>();
private int initialSlabSize;
private final int maxCapacityHint;
private final List<byte[]> slabs = new ArrayList<byte[]>();

private byte[] currentSlab;
private int capacity = 0;
private int currentSlabIndex;
private int currentSlabPosition;
private int size;
private int bytesAllocated = 0;
private int bytesUsed = 0;

/**
* defaults pageSize to 1MB
* @param initialSize
* Defaults maxCapacityHint to 1MB
* @param initialSlabSize
* @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int)}
*/
@Deprecated
public CapacityByteArrayOutputStream(int initialSize) {
this(initialSize, 1024 * 1024);
public CapacityByteArrayOutputStream(int initialSlabSize) {
this(initialSlabSize, 1024 * 1024);
}

/**
* @param initialSize the initialSize of the buffer (also slab size)
* @param pageSize
* @param initialSlabSize the size to make the first slab
* @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream
*/
public CapacityByteArrayOutputStream(int initialSize, int pageSize) {
checkArgument(initialSize > 0, "initialSize must be > 0");
checkArgument(pageSize > 0, "pageSize must be > 0");
this.pageSize = pageSize;
initSlabs(initialSize);
}

private void initSlabs(int initialSize) {
if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSize));
this.initialSize = initialSize;
this.slabs.clear();
this.capacity = 0;
this.currentSlab = EMPTY_SLAB;
this.currentSlabIndex = -1;
this.currentSlabPosition = 0;
this.size = 0;
public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) {
checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0");
checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0");
this.initialSlabSize = initialSlabSize;
this.maxCapacityHint = maxCapacityHint;
reset();
}

/**
* the new slab is guaranteed to be at least minimumSize
* @param minimumSize the size of the data we want to copy in the new slab
*/
private void addSlab(int minimumSize) {
this.currentSlabIndex += 1;
int nextSlabSize;
if (size == 0) {
nextSlabSize = initialSize;
} else if (size > pageSize / 5) {

if (bytesUsed == 0) {
nextSlabSize = initialSlabSize;
} else if (bytesUsed > maxCapacityHint / 5) {
// to avoid an overhead of up to twice the needed size, we get linear when approaching target page size
nextSlabSize = pageSize / 5;
nextSlabSize = maxCapacityHint / 5;
} else {
// double the size every time
nextSlabSize = size;
nextSlabSize = bytesUsed;
}

if (nextSlabSize < minimumSize) {
if (Log.DEBUG) LOG.debug(format("slab size %,d too small for value of size %,d. Bumping up slab size", nextSlabSize, minimumSize));
nextSlabSize = minimumSize;
}
if (Log.DEBUG) LOG.debug(format("used %d slabs, new slab size %d", currentSlabIndex, nextSlabSize));

if (Log.DEBUG) LOG.debug(format("used %d slabs, adding new slab of size %d", slabs.size(), nextSlabSize));

this.currentSlab = new byte[nextSlabSize];
this.slabs.add(currentSlab);
this.capacity += nextSlabSize;
this.currentSlabPosition = 0;
this.bytesAllocated += nextSlabSize;
this.currentSlabIndex = 0;
}

@Override
public void write(int b) {
if (currentSlabPosition == currentSlab.length) {
if (currentSlabIndex == currentSlab.length) {
addSlab(1);
}
currentSlab[currentSlabPosition] = (byte) b;
currentSlabPosition += 1;
size += 1;
currentSlab[currentSlabIndex] = (byte) b;
currentSlabIndex += 1;
bytesUsed += 1;
}

@Override
public void write(byte b[], int off, int len) {
if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) - b.length > 0)) {
throw new IndexOutOfBoundsException();
throw new IndexOutOfBoundsException(String.format("len: %d, off: %d", len, off));
}
if (currentSlabPosition + len >= currentSlab.length) {
final int length1 = currentSlab.length - currentSlabPosition;
arraycopy(b, off, currentSlab, currentSlabPosition, length1);
if (currentSlabIndex + len >= currentSlab.length) {
final int length1 = currentSlab.length - currentSlabIndex;
arraycopy(b, off, currentSlab, currentSlabIndex, length1);
final int length2 = len - length1;
addSlab(length2);
arraycopy(b, off + length1, currentSlab, currentSlabPosition, length2);
currentSlabPosition = length2;
arraycopy(b, off + length1, currentSlab, currentSlabIndex, length2);
currentSlabIndex = length2;
} else {
arraycopy(b, off, currentSlab, currentSlabPosition, len);
currentSlabPosition += len;
arraycopy(b, off, currentSlab, currentSlabIndex, len);
currentSlabIndex += len;
}
size += len;
bytesUsed += len;
}

/**
Expand All @@ -150,18 +151,26 @@ public void write(byte b[], int off, int len) {
* @exception IOException if an I/O error occurs.
*/
public void writeTo(OutputStream out) throws IOException {
for (int i = 0; i < currentSlabIndex; i++) {
for (int i = 0; i < slabs.size() - 1; i++) {
final byte[] slab = slabs.get(i);
out.write(slab, 0, slab.length);
}
out.write(currentSlab, 0, currentSlabPosition);
out.write(currentSlab, 0, currentSlabIndex);
}

/**
* @return the size of the allocated buffer
* @return The total size in bytes of data written to this stream.
*/
public long size() {
return bytesUsed;
}

/**
*
* @return The total size in bytes currently allocated for this stream.
*/
public int getCapacity() {
return capacity;
return bytesAllocated;
}

/**
Expand All @@ -172,23 +181,22 @@ public int getCapacity() {
public void reset() {
// readjust slab size.
// 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size
initSlabs(max(size / 7, initialSize));
}

/**
* @return the size of the buffered data
*/
public long size() {
return size;
this.initialSlabSize = max(bytesUsed / 7, initialSlabSize);
if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSlabSize));
this.slabs.clear();
this.bytesAllocated = 0;
this.bytesUsed = 0;
this.currentSlab = EMPTY_SLAB;
this.currentSlabIndex = 0;
}

/**
* @return the index of the last value being written to this stream, which
* @return the index of the last value written to this stream, which
* can be passed to {@link #setByte(long, byte)} in order to change it
*/
public long getCurrentIndex() {
checkArgument(size > 0, "This is an empty stream");
return size - 1;
checkArgument(bytesUsed > 0, "This is an empty stream");
return bytesUsed - 1;
}

/**
Expand All @@ -198,10 +206,10 @@ public long getCurrentIndex() {
* @param value the value to replace it with
*/
public void setByte(long index, byte value) {
checkArgument(index < size, "Index: " + index + " is >= the current size of: " + size);
checkArgument(index < bytesUsed, "Index: " + index + " is >= the current size of: " + bytesUsed);

long seen = 0;
for (int i = 0; i <= currentSlabIndex; i++) {
for (int i = 0; i < slabs.size(); i++) {
byte[] slab = slabs.get(i);
if (index < seen + slab.length) {
// ok found index
Expand All @@ -221,7 +229,7 @@ public String memUsageString(String prefix) {
}

/**
* @return the total count of allocated slabs
* @return the total number of allocated slabs
*/
int getSlabCount() {
return slabs.size();
Expand Down

0 comments on commit b2736a1

Please sign in to comment.