Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class ChunkBufferImplWithByteBufferList implements ChunkBuffer {

private void findCurrent() {
boolean found = false;
limitPrecedingCurrent = 0;
for (int i = 0; i < buffers.size(); i++) {
final ByteBuffer buf = buffers.get(i);
final int pos = buf.position();
Expand Down Expand Up @@ -185,6 +186,8 @@ public ChunkBuffer duplicate(int newPosition, int newLimit) {
*/
@Override
public Iterable<ByteBuffer> iterate(int bufferSize) {
Preconditions.checkArgument(bufferSize > 0);

return () -> new Iterator<ByteBuffer>() {
@Override
public boolean hasNext() {
Expand All @@ -198,10 +201,40 @@ public ByteBuffer next() {
}
findCurrent();
ByteBuffer current = buffers.get(currentIndex);
final ByteBuffer duplicated = current.duplicate();
duplicated.limit(current.limit());
current.position(current.limit());
return duplicated;

// If current buffer has enough space or it's the last one, return it.
if (current.remaining() >= bufferSize || currentIndex == buffers.size() - 1) {
final ByteBuffer duplicated = current.duplicate();
int duplicatedLimit = Math.min(current.position() + bufferSize, current.limit());
duplicated.limit(duplicatedLimit);
duplicated.position(current.position());

current.position(duplicatedLimit);
return duplicated;
}

// Otherwise, create a new buffer.
int newBufferSize = Math.min(bufferSize, remaining());
ByteBuffer allocated = ByteBuffer.allocate(newBufferSize);
int remainingToFill = allocated.remaining();

while (remainingToFill > 0) {
final ByteBuffer b = current();
int bytes = Math.min(b.remaining(), remainingToFill);
b.limit(b.position() + bytes);
allocated.put(b);
remainingToFill -= bytes;
advanceCurrent();
}

allocated.flip();

// Up-to-date current.
current = buffers.get(currentIndex);
// Reset
current.limit(current.capacity());

return allocated;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,76 @@ public void rejectsMultipleCurrentBuffers() {
assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list));
}

@Test
public void testIterateSmallerOverSingleChunk() {
ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(ByteBuffer.allocate(100)));

assertEquals(0, subject.position());
assertEquals(100, subject.remaining());
assertEquals(100, subject.limit());

subject.iterate(25).forEach(buffer -> assertEquals(25, buffer.remaining()));

assertEquals(100, subject.position());
assertEquals(0, subject.remaining());
assertEquals(100, subject.limit());
}

@Test
public void testIterateOverMultipleChunksFitChunkSize() {
ByteBuffer b1 = ByteBuffer.allocate(100);
ByteBuffer b2 = ByteBuffer.allocate(100);
ByteBuffer b3 = ByteBuffer.allocate(100);
ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(b1, b2, b3));

assertEquals(0, subject.position());
assertEquals(300, subject.remaining());
assertEquals(300, subject.limit());

subject.iterate(100).forEach(buffer -> assertEquals(100, buffer.remaining()));

assertEquals(300, subject.position());
assertEquals(0, subject.remaining());
assertEquals(300, subject.limit());
}

@Test
public void testIterateOverMultipleChunksSmallerChunks() {
ByteBuffer b1 = ByteBuffer.allocate(100);
ByteBuffer b2 = ByteBuffer.allocate(100);
ByteBuffer b3 = ByteBuffer.allocate(100);
ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(b1, b2, b3));

assertEquals(0, subject.position());
assertEquals(300, subject.remaining());
assertEquals(300, subject.limit());

subject.iterate(50).forEach(buffer -> assertEquals(50, buffer.remaining()));

assertEquals(300, subject.position());
assertEquals(0, subject.remaining());
assertEquals(300, subject.limit());
}

@Test
public void testIterateOverMultipleChunksBiggerChunks() {
ByteBuffer b1 = ByteBuffer.allocate(100);
ByteBuffer b2 = ByteBuffer.allocate(100);
ByteBuffer b3 = ByteBuffer.allocate(100);
ByteBuffer b4 = ByteBuffer.allocate(100);
ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(b1, b2, b3, b4));

assertEquals(0, subject.position());
assertEquals(400, subject.remaining());
assertEquals(400, subject.limit());

subject.iterate(200).forEach(buffer -> assertEquals(200, buffer.remaining()));

assertEquals(400, subject.position());
assertEquals(0, subject.remaining());
assertEquals(400, subject.limit());
}

private static void assertEmpty(ChunkBuffer subject) {
assertEquals(0, subject.position());
assertEquals(0, subject.remaining());
Expand Down