Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public class CodecBuffer implements UncheckedAutoCloseable {
private static class Factory {
private static volatile BiFunction<ByteBuf, Object, CodecBuffer> constructor
= CodecBuffer::new;
static void set(BiFunction<ByteBuf, Object, CodecBuffer> f) {
static void set(BiFunction<ByteBuf, Object, CodecBuffer> f, String name) {
constructor = f;
LOG.info("Successfully set constructor to " + f);
LOG.info("Successfully set constructor to {}: {}", name, f);
}

static CodecBuffer newCodecBuffer(ByteBuf buf) {
Expand Down Expand Up @@ -89,7 +89,7 @@ protected void finalize() {
* Note that there is a severe performance penalty for leak detection.
*/
public static void enableLeakDetection() {
Factory.set(LeakDetector::newCodecBuffer);
Factory.set(LeakDetector::newCodecBuffer, "LeakDetector::newCodecBuffer");
}

/** The size of a buffer. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,7 @@ public List<ByteBuffer> asByteBufferList() {

@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
long written = 0;
for (ByteBuffer buf : buffers) {
written += BufferUtils.writeFully(channel, buf);
}
final long written = BufferUtils.writeFully(channel, buffers);
findCurrent();
return written;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,7 @@ public List<ByteBuffer> asByteBufferList() {

@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
long written = 0;
for (ByteBuffer buf : buffers) {
written += BufferUtils.writeFully(channel, buf);
}
return written;
return BufferUtils.writeFully(channel, buffers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utilities for buffers.
*/
public final class BufferUtils {
public static final Logger LOG = LoggerFactory.getLogger(BufferUtils.class);

private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = {};

/** Utility classes should not be constructed. **/
private BufferUtils() {
Expand Down Expand Up @@ -147,11 +152,38 @@ public static long writeFully(GatheringByteChannel ch, ByteBuffer bb) throws IOE
long written = 0;
while (bb.remaining() > 0) {
int n = ch.write(bb);
if (n <= 0) {
throw new IllegalStateException("no bytes written");
if (n < 0) {
throw new IllegalStateException("GatheringByteChannel.write returns " + n + " < 0 for " + ch);
}
written += n;
}
return written;
}

public static long writeFully(GatheringByteChannel ch, List<ByteBuffer> buffers) throws IOException {
return BufferUtils.writeFully(ch, buffers.toArray(EMPTY_BYTE_BUFFER_ARRAY));
}

public static long writeFully(GatheringByteChannel ch, ByteBuffer[] buffers) throws IOException {
if (LOG.isDebugEnabled()) {
for (int i = 0; i < buffers.length; i++) {
LOG.debug("buffer[{}]: remaining={}", i, buffers[i].remaining());
}
}

long written = 0;
for (int i = 0; i < buffers.length; i++) {
while (buffers[i].remaining() > 0) {
final long n = ch.write(buffers, i, buffers.length - i);
if (LOG.isDebugEnabled()) {
LOG.debug("buffer[{}]: remaining={}, written={}", i, buffers[i].remaining(), n);
}
if (n < 0) {
throw new IllegalStateException("GatheringByteChannel.write returns " + n + " < 0 for " + ch);
}
written += n;
}
}
return written;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.ThreadLocalRandom;

import static com.google.common.base.Preconditions.checkElementIndex;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* {@link GatheringByteChannel} implementation for testing. Delegates
Expand All @@ -45,11 +48,32 @@ public long write(ByteBuffer[] srcs, int offset, int length)
checkElementIndex(offset, srcs.length, "offset");
checkElementIndex(offset + length - 1, srcs.length, "offset+length");

long bytes = 0;
for (ByteBuffer b : srcs) {
bytes += write(b);
long fullLength = 0;
for (int i = offset; i < srcs.length; i++) {
fullLength += srcs[i].remaining();
}
return bytes;
if (fullLength <= 0) {
return 0;
}

// simulate partial write by setting a random partial length
final long partialLength = ThreadLocalRandom.current().nextLong(fullLength + 1);

long written = 0;
for (int i = offset; i < srcs.length; i++) {
for (final ByteBuffer src = srcs[i]; src.hasRemaining();) {
final long n = partialLength - written; // write at most n bytes
assertThat(n).isGreaterThanOrEqualTo(0);
if (n == 0) {
return written;
}

final int remaining = src.remaining();
final int adjustment = remaining <= n ? 0 : Math.toIntExact(remaining - n);
written += adjustedWrite(src, adjustment);
}
}
return written;
}

@Override
Expand All @@ -59,21 +83,40 @@ public long write(ByteBuffer[] srcs) throws IOException {

@Override
public int write(ByteBuffer src) throws IOException {
// If src has more than 1 byte left, simulate partial write by adjusting limit.
// Remaining 1 byte should be written on next call.
// This helps verify that the caller ensures buffer is written fully.
final int adjustment = 1;
final boolean limitWrite = src.remaining() > adjustment;
if (limitWrite) {
src.limit(src.limit() - adjustment);
final int remaining = src.remaining();
if (remaining <= 0) {
return 0;
}
try {
return delegate.write(src);
} finally {
if (limitWrite) {
src.limit(src.limit() + adjustment);
}
// Simulate partial write by a random adjustment.
final int adjustment = ThreadLocalRandom.current().nextInt(remaining + 1);
return adjustedWrite(src, adjustment);
}

/** Simulate partial write by the given adjustment. */
private int adjustedWrite(ByteBuffer src, int adjustment) throws IOException {
assertThat(adjustment).isGreaterThanOrEqualTo(0);
final int remaining = src.remaining();
if (remaining <= 0) {
return 0;
}
assertThat(adjustment).isLessThanOrEqualTo(remaining);

final int oldLimit = src.limit();
final int newLimit = oldLimit - adjustment;
src.limit(newLimit);
assertEquals(newLimit, src.limit());
final int toWrite = remaining - adjustment;
assertEquals(toWrite, src.remaining());

final int written = delegate.write(src);
assertEquals(newLimit, src.limit());
assertEquals(toWrite - written, src.remaining());

src.limit(oldLimit);
assertEquals(oldLimit, src.limit());
assertEquals(remaining - written, src.remaining());

return written;
}

@Override
Expand Down
Loading