diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java index 7c3a0c7d2d5..a3b5f9d2eef 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java @@ -61,6 +61,7 @@ public class ChunkBufferImplWithByteBufferList implements ChunkBuffer { private void findCurrent() { boolean found = false; + limitPrecedingCurrent = 0; for (int i = 0; i < buffers.size(); i++) { final ByteBuffer buf = buffers.get(i); final int pos = buf.position(); @@ -185,6 +186,8 @@ public ChunkBuffer duplicate(int newPosition, int newLimit) { */ @Override public Iterable iterate(int bufferSize) { + Preconditions.checkArgument(bufferSize > 0); + return () -> new Iterator() { @Override public boolean hasNext() { @@ -198,10 +201,40 @@ public ByteBuffer next() { } findCurrent(); ByteBuffer current = buffers.get(currentIndex); - final ByteBuffer duplicated = current.duplicate(); - duplicated.limit(current.limit()); - current.position(current.limit()); - return duplicated; + + // If current buffer has enough space or it's the last one, return it. + if (current.remaining() >= bufferSize || currentIndex == buffers.size() - 1) { + final ByteBuffer duplicated = current.duplicate(); + int duplicatedLimit = Math.min(current.position() + bufferSize, current.limit()); + duplicated.limit(duplicatedLimit); + duplicated.position(current.position()); + + current.position(duplicatedLimit); + return duplicated; + } + + // Otherwise, create a new buffer. + int newBufferSize = Math.min(bufferSize, remaining()); + ByteBuffer allocated = ByteBuffer.allocate(newBufferSize); + int remainingToFill = allocated.remaining(); + + while (remainingToFill > 0) { + final ByteBuffer b = current(); + int bytes = Math.min(b.remaining(), remainingToFill); + b.limit(b.position() + bytes); + allocated.put(b); + remainingToFill -= bytes; + advanceCurrent(); + } + + allocated.flip(); + + // Up-to-date current. + current = buffers.get(currentIndex); + // Reset + current.limit(current.capacity()); + + return allocated; } }; } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java index 072c07be64f..b06b4b56332 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java @@ -67,6 +67,76 @@ public void rejectsMultipleCurrentBuffers() { assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list)); } + @Test + public void testIterateSmallerOverSingleChunk() { + ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(ByteBuffer.allocate(100))); + + assertEquals(0, subject.position()); + assertEquals(100, subject.remaining()); + assertEquals(100, subject.limit()); + + subject.iterate(25).forEach(buffer -> assertEquals(25, buffer.remaining())); + + assertEquals(100, subject.position()); + assertEquals(0, subject.remaining()); + assertEquals(100, subject.limit()); + } + + @Test + public void testIterateOverMultipleChunksFitChunkSize() { + ByteBuffer b1 = ByteBuffer.allocate(100); + ByteBuffer b2 = ByteBuffer.allocate(100); + ByteBuffer b3 = ByteBuffer.allocate(100); + ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(b1, b2, b3)); + + assertEquals(0, subject.position()); + assertEquals(300, subject.remaining()); + assertEquals(300, subject.limit()); + + subject.iterate(100).forEach(buffer -> assertEquals(100, buffer.remaining())); + + assertEquals(300, subject.position()); + assertEquals(0, subject.remaining()); + assertEquals(300, subject.limit()); + } + + @Test + public void testIterateOverMultipleChunksSmallerChunks() { + ByteBuffer b1 = ByteBuffer.allocate(100); + ByteBuffer b2 = ByteBuffer.allocate(100); + ByteBuffer b3 = ByteBuffer.allocate(100); + ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(b1, b2, b3)); + + assertEquals(0, subject.position()); + assertEquals(300, subject.remaining()); + assertEquals(300, subject.limit()); + + subject.iterate(50).forEach(buffer -> assertEquals(50, buffer.remaining())); + + assertEquals(300, subject.position()); + assertEquals(0, subject.remaining()); + assertEquals(300, subject.limit()); + } + + @Test + public void testIterateOverMultipleChunksBiggerChunks() { + ByteBuffer b1 = ByteBuffer.allocate(100); + ByteBuffer b2 = ByteBuffer.allocate(100); + ByteBuffer b3 = ByteBuffer.allocate(100); + ByteBuffer b4 = ByteBuffer.allocate(100); + ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of(b1, b2, b3, b4)); + + assertEquals(0, subject.position()); + assertEquals(400, subject.remaining()); + assertEquals(400, subject.limit()); + + subject.iterate(200).forEach(buffer -> assertEquals(200, buffer.remaining())); + + assertEquals(400, subject.position()); + assertEquals(0, subject.remaining()); + assertEquals(400, subject.limit()); + } + private static void assertEmpty(ChunkBuffer subject) { assertEquals(0, subject.position()); assertEquals(0, subject.remaining());