diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java index 1ac293b301bb..87be912bb53a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java @@ -58,9 +58,9 @@ public class CodecBuffer implements UncheckedAutoCloseable { private static class Factory { private static volatile BiFunction constructor = CodecBuffer::new; - static void set(BiFunction f) { + static void set(BiFunction f, String name) { constructor = f; - LOG.info("Successfully set constructor to " + f); + LOG.info("Successfully set constructor to {}: {}", name, f); } static CodecBuffer newCodecBuffer(ByteBuf buf) { @@ -89,7 +89,7 @@ protected void finalize() { * Note that there is a severe performance penalty for leak detection. */ public static void enableLeakDetection() { - Factory.set(LeakDetector::newCodecBuffer); + Factory.set(LeakDetector::newCodecBuffer, "LeakDetector::newCodecBuffer"); } /** The size of a buffer. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java index f9992c9442db..e1f169662f89 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java @@ -248,10 +248,7 @@ public List asByteBufferList() { @Override public long writeTo(GatheringByteChannel channel) throws IOException { - long written = 0; - for (ByteBuffer buf : buffers) { - written += BufferUtils.writeFully(channel, buf); - } + final long written = BufferUtils.writeFully(channel, buffers); findCurrent(); return written; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java index 249c67e4dd3e..732af4b68505 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java @@ -280,11 +280,7 @@ public List asByteBufferList() { @Override public long writeTo(GatheringByteChannel channel) throws IOException { - long written = 0; - for (ByteBuffer buf : buffers) { - written += BufferUtils.writeFully(channel, buf); - } - return written; + return BufferUtils.writeFully(channel, buffers); } @Override diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java index 01b2ec0af10a..a266c3615b07 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java @@ -26,11 +26,16 @@ import java.util.ArrayList; import java.util.List; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utilities for buffers. */ public final class BufferUtils { + public static final Logger LOG = LoggerFactory.getLogger(BufferUtils.class); + + private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = {}; /** Utility classes should not be constructed. **/ private BufferUtils() { @@ -147,11 +152,38 @@ public static long writeFully(GatheringByteChannel ch, ByteBuffer bb) throws IOE long written = 0; while (bb.remaining() > 0) { int n = ch.write(bb); - if (n <= 0) { - throw new IllegalStateException("no bytes written"); + if (n < 0) { + throw new IllegalStateException("GatheringByteChannel.write returns " + n + " < 0 for " + ch); } written += n; } return written; } + + public static long writeFully(GatheringByteChannel ch, List buffers) throws IOException { + return BufferUtils.writeFully(ch, buffers.toArray(EMPTY_BYTE_BUFFER_ARRAY)); + } + + public static long writeFully(GatheringByteChannel ch, ByteBuffer[] buffers) throws IOException { + if (LOG.isDebugEnabled()) { + for (int i = 0; i < buffers.length; i++) { + LOG.debug("buffer[{}]: remaining={}", i, buffers[i].remaining()); + } + } + + long written = 0; + for (int i = 0; i < buffers.length; i++) { + while (buffers[i].remaining() > 0) { + final long n = ch.write(buffers, i, buffers.length - i); + if (LOG.isDebugEnabled()) { + LOG.debug("buffer[{}]: remaining={}, written={}", i, buffers[i].remaining(), n); + } + if (n < 0) { + throw new IllegalStateException("GatheringByteChannel.write returns " + n + " < 0 for " + ch); + } + written += n; + } + } + return written; + } } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java index 8f9256cd7787..83b68512380e 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java @@ -21,8 +21,11 @@ import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ThreadLocalRandom; import static com.google.common.base.Preconditions.checkElementIndex; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; /** * {@link GatheringByteChannel} implementation for testing. Delegates @@ -45,11 +48,32 @@ public long write(ByteBuffer[] srcs, int offset, int length) checkElementIndex(offset, srcs.length, "offset"); checkElementIndex(offset + length - 1, srcs.length, "offset+length"); - long bytes = 0; - for (ByteBuffer b : srcs) { - bytes += write(b); + long fullLength = 0; + for (int i = offset; i < srcs.length; i++) { + fullLength += srcs[i].remaining(); } - return bytes; + if (fullLength <= 0) { + return 0; + } + + // simulate partial write by setting a random partial length + final long partialLength = ThreadLocalRandom.current().nextLong(fullLength + 1); + + long written = 0; + for (int i = offset; i < srcs.length; i++) { + for (final ByteBuffer src = srcs[i]; src.hasRemaining();) { + final long n = partialLength - written; // write at most n bytes + assertThat(n).isGreaterThanOrEqualTo(0); + if (n == 0) { + return written; + } + + final int remaining = src.remaining(); + final int adjustment = remaining <= n ? 0 : Math.toIntExact(remaining - n); + written += adjustedWrite(src, adjustment); + } + } + return written; } @Override @@ -59,21 +83,40 @@ public long write(ByteBuffer[] srcs) throws IOException { @Override public int write(ByteBuffer src) throws IOException { - // If src has more than 1 byte left, simulate partial write by adjusting limit. - // Remaining 1 byte should be written on next call. - // This helps verify that the caller ensures buffer is written fully. - final int adjustment = 1; - final boolean limitWrite = src.remaining() > adjustment; - if (limitWrite) { - src.limit(src.limit() - adjustment); + final int remaining = src.remaining(); + if (remaining <= 0) { + return 0; } - try { - return delegate.write(src); - } finally { - if (limitWrite) { - src.limit(src.limit() + adjustment); - } + // Simulate partial write by a random adjustment. + final int adjustment = ThreadLocalRandom.current().nextInt(remaining + 1); + return adjustedWrite(src, adjustment); + } + + /** Simulate partial write by the given adjustment. */ + private int adjustedWrite(ByteBuffer src, int adjustment) throws IOException { + assertThat(adjustment).isGreaterThanOrEqualTo(0); + final int remaining = src.remaining(); + if (remaining <= 0) { + return 0; } + assertThat(adjustment).isLessThanOrEqualTo(remaining); + + final int oldLimit = src.limit(); + final int newLimit = oldLimit - adjustment; + src.limit(newLimit); + assertEquals(newLimit, src.limit()); + final int toWrite = remaining - adjustment; + assertEquals(toWrite, src.remaining()); + + final int written = delegate.write(src); + assertEquals(newLimit, src.limit()); + assertEquals(toWrite - written, src.remaining()); + + src.limit(oldLimit); + assertEquals(oldLimit, src.limit()); + assertEquals(remaining - written, src.remaining()); + + return written; } @Override