diff --git a/api/src/main/java/io/grpc/Detachable.java b/api/src/main/java/io/grpc/Detachable.java
new file mode 100644
index 00000000000..c0cbf016f5b
--- /dev/null
+++ b/api/src/main/java/io/grpc/Detachable.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2021 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc;
+
+import java.io.InputStream;
+
+/**
+ * An extension of {@link InputStream} that allows the underlying data source to be detached and
+ * transferred to a new instance of the same kind. The detached InputStream takes over the
+ * ownership of the underlying data source. That's said, the detached InputStream is responsible
+ * for releasing its resources after use. The detached InputStream preserves internal states of
+ * the underlying data source. Data can be consumed through the detached InputStream as if being
+ * continually consumed through the original instance. The original instance discards internal
+ * states of detached data source and is no longer consumable as if the data source is exhausted.
+ *
+ *
A normal usage of this API is to extend the lifetime of the data source owned by the
+ * original instance for doing extra processing before releasing it. For example, when combined
+ * with {@link HasByteBuffer}, a custom {@link io.grpc.MethodDescriptor.Marshaller} can take
+ * over the ownership of buffers containing inbound data and perform delayed deserialization.
+ */
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/7387")
+public interface Detachable {
+
+ /**
+ * Detaches the underlying data source from this instance and transfers to an {@link
+ * InputStream}. Detaching data from an already-detached instance gives an InputStream with
+ * zero bytes of data.
+ *
+ */
+ InputStream detach();
+}
diff --git a/api/src/main/java/io/grpc/HasByteBuffer.java b/api/src/main/java/io/grpc/HasByteBuffer.java
new file mode 100644
index 00000000000..97f2435524a
--- /dev/null
+++ b/api/src/main/java/io/grpc/HasByteBuffer.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2020 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+
+/**
+ * Extension to an {@link java.io.InputStream} whose content can be accessed as {@link
+ * ByteBuffer}s.
+ *
+ *
This can be used for optimizing the case for the consumer of a {@link ByteBuffer}-backed
+ * input stream supports efficient reading from {@link ByteBuffer}s directly. This turns the reader
+ * interface from an {@link java.io.InputStream} to {@link ByteBuffer}s, without copying the
+ * content to a byte array and read from it.
+ */
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/7387")
+public interface HasByteBuffer {
+
+ /**
+ * Indicates whether or not {@link #getByteBuffer} operation is supported.
+ */
+ boolean byteBufferSupported();
+
+ /**
+ * Gets a {@link ByteBuffer} containing some bytes of the content next to be read, or {@code
+ * null} if has reached end of the content. The number of bytes contained in the returned buffer
+ * is implementation specific. Calling this method does not change the position of the input
+ * stream. The returned buffer's content should not be modified, but the position, limit, and
+ * mark may be changed. Operations for changing the position, limit, and mark of the returned
+ * buffer does not affect the position, limit, and mark of this input stream. This is an optional
+ * method, so callers should first check {@link #byteBufferSupported}.
+ *
+ * @throws UnsupportedOperationException if this operation is not supported.
+ */
+ @Nullable
+ ByteBuffer getByteBuffer();
+}
diff --git a/core/src/main/java/io/grpc/internal/AbstractReadableBuffer.java b/core/src/main/java/io/grpc/internal/AbstractReadableBuffer.java
index e43b7a7cc0e..16c046dfc36 100644
--- a/core/src/main/java/io/grpc/internal/AbstractReadableBuffer.java
+++ b/core/src/main/java/io/grpc/internal/AbstractReadableBuffer.java
@@ -16,6 +16,8 @@
package io.grpc.internal;
+import java.nio.ByteBuffer;
+
/**
* Abstract base class for {@link ReadableBuffer} implementations.
*/
@@ -45,6 +47,29 @@ public int arrayOffset() {
throw new UnsupportedOperationException();
}
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public void mark() {}
+
+ @Override
+ public void reset() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean byteBufferSupported() {
+ return false;
+ }
+
+ @Override
+ public ByteBuffer getByteBuffer() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void close() {}
diff --git a/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java b/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java
index 34021d8a82b..9baec34b189 100644
--- a/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java
+++ b/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java
@@ -20,8 +20,10 @@
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
+import java.nio.InvalidMarkException;
import java.util.ArrayDeque;
-import java.util.Queue;
+import java.util.Deque;
+import javax.annotation.Nullable;
/**
* A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a
@@ -33,15 +35,17 @@
*/
public class CompositeReadableBuffer extends AbstractReadableBuffer {
+ private final Deque readableBuffers;
+ private Deque rewindableBuffers;
private int readableBytes;
- private final Queue buffers;
+ private boolean marked;
public CompositeReadableBuffer(int initialCapacity) {
- buffers = new ArrayDeque<>(initialCapacity);
+ readableBuffers = new ArrayDeque<>(initialCapacity);
}
public CompositeReadableBuffer() {
- buffers = new ArrayDeque<>();
+ readableBuffers = new ArrayDeque<>();
}
/**
@@ -51,16 +55,24 @@ public CompositeReadableBuffer() {
* this {@code CompositeBuffer}.
*/
public void addBuffer(ReadableBuffer buffer) {
+ boolean markHead = marked && readableBuffers.isEmpty();
+ enqueueBuffer(buffer);
+ if (markHead) {
+ readableBuffers.peek().mark();
+ }
+ }
+
+ private void enqueueBuffer(ReadableBuffer buffer) {
if (!(buffer instanceof CompositeReadableBuffer)) {
- buffers.add(buffer);
+ readableBuffers.add(buffer);
readableBytes += buffer.readableBytes();
return;
}
CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer;
- while (!compositeBuffer.buffers.isEmpty()) {
- ReadableBuffer subBuffer = compositeBuffer.buffers.remove();
- buffers.add(subBuffer);
+ while (!compositeBuffer.readableBuffers.isEmpty()) {
+ ReadableBuffer subBuffer = compositeBuffer.readableBuffers.remove();
+ readableBuffers.add(subBuffer);
}
readableBytes += compositeBuffer.readableBytes;
compositeBuffer.readableBytes = 0;
@@ -158,22 +170,27 @@ public ReadableBuffer readBytes(int length) {
ReadableBuffer newBuffer = null;
CompositeReadableBuffer newComposite = null;
do {
- ReadableBuffer buffer = buffers.peek();
+ ReadableBuffer buffer = readableBuffers.peek();
int readable = buffer.readableBytes();
ReadableBuffer readBuffer;
if (readable > length) {
readBuffer = buffer.readBytes(length);
length = 0;
} else {
- readBuffer = buffers.poll();
+ if (marked) {
+ readBuffer = buffer.readBytes(readable);
+ advanceBuffer();
+ } else {
+ readBuffer = readableBuffers.poll();
+ }
length -= readable;
}
if (newBuffer == null) {
newBuffer = readBuffer;
} else {
if (newComposite == null) {
- newComposite =
- new CompositeReadableBuffer(length == 0 ? 2 : Math.min(buffers.size() + 2, 16));
+ newComposite = new CompositeReadableBuffer(
+ length == 0 ? 2 : Math.min(readableBuffers.size() + 2, 16));
newComposite.addBuffer(newBuffer);
newBuffer = newComposite;
}
@@ -183,10 +200,77 @@ public ReadableBuffer readBytes(int length) {
return newBuffer;
}
+ @Override
+ public boolean markSupported() {
+ for (ReadableBuffer buffer : readableBuffers) {
+ if (!buffer.markSupported()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void mark() {
+ if (rewindableBuffers == null) {
+ rewindableBuffers = new ArrayDeque<>(Math.min(readableBuffers.size(), 16));
+ }
+ while (!rewindableBuffers.isEmpty()) {
+ rewindableBuffers.remove().close();
+ }
+ marked = true;
+ ReadableBuffer buffer = readableBuffers.peek();
+ if (buffer != null) {
+ buffer.mark();
+ }
+ }
+
+ @Override
+ public void reset() {
+ if (!marked) {
+ throw new InvalidMarkException();
+ }
+ ReadableBuffer buffer;
+ if ((buffer = readableBuffers.peek()) != null) {
+ int currentRemain = buffer.readableBytes();
+ buffer.reset();
+ readableBytes += (buffer.readableBytes() - currentRemain);
+ }
+ while ((buffer = rewindableBuffers.pollLast()) != null) {
+ buffer.reset();
+ readableBuffers.addFirst(buffer);
+ readableBytes += buffer.readableBytes();
+ }
+ }
+
+ @Override
+ public boolean byteBufferSupported() {
+ for (ReadableBuffer buffer : readableBuffers) {
+ if (!buffer.byteBufferSupported()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Nullable
+ @Override
+ public ByteBuffer getByteBuffer() {
+ if (readableBuffers.isEmpty()) {
+ return null;
+ }
+ return readableBuffers.peek().getByteBuffer();
+ }
+
@Override
public void close() {
- while (!buffers.isEmpty()) {
- buffers.remove().close();
+ while (!readableBuffers.isEmpty()) {
+ readableBuffers.remove().close();
+ }
+ if (rewindableBuffers != null) {
+ while (!rewindableBuffers.isEmpty()) {
+ rewindableBuffers.remove().close();
+ }
}
}
@@ -197,12 +281,12 @@ public void close() {
private int execute(ReadOperation op, int length, T dest, int value) throws IOException {
checkReadable(length);
- if (!buffers.isEmpty()) {
+ if (!readableBuffers.isEmpty()) {
advanceBufferIfNecessary();
}
- for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) {
- ReadableBuffer buffer = buffers.peek();
+ for (; length > 0 && !readableBuffers.isEmpty(); advanceBufferIfNecessary()) {
+ ReadableBuffer buffer = readableBuffers.peek();
int lengthToCopy = Math.min(length, buffer.readableBytes());
// Perform the read operation for this buffer.
@@ -232,9 +316,24 @@ private int executeNoThrow(NoThrowReadOperation op, int length, T dest, i
* If the current buffer is exhausted, removes and closes it.
*/
private void advanceBufferIfNecessary() {
- ReadableBuffer buffer = buffers.peek();
+ ReadableBuffer buffer = readableBuffers.peek();
if (buffer.readableBytes() == 0) {
- buffers.remove().close();
+ advanceBuffer();
+ }
+ }
+
+ /**
+ * Removes one buffer from the front and closes it.
+ */
+ private void advanceBuffer() {
+ if (marked) {
+ rewindableBuffers.add(readableBuffers.remove());
+ ReadableBuffer next = readableBuffers.peek();
+ if (next != null) {
+ next.mark();
+ }
+ } else {
+ readableBuffers.remove().close();
}
}
diff --git a/core/src/main/java/io/grpc/internal/ForwardingReadableBuffer.java b/core/src/main/java/io/grpc/internal/ForwardingReadableBuffer.java
index 954d0ac5486..1d7b412e195 100644
--- a/core/src/main/java/io/grpc/internal/ForwardingReadableBuffer.java
+++ b/core/src/main/java/io/grpc/internal/ForwardingReadableBuffer.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
/**
* Base class for a wrapper around another {@link ReadableBuffer}.
@@ -96,6 +97,32 @@ public int arrayOffset() {
return buf.arrayOffset();
}
+ @Override
+ public boolean markSupported() {
+ return buf.markSupported();
+ }
+
+ @Override
+ public void mark() {
+ buf.mark();
+ }
+
+ @Override
+ public void reset() {
+ buf.reset();
+ }
+
+ @Override
+ public boolean byteBufferSupported() {
+ return buf.byteBufferSupported();
+ }
+
+ @Nullable
+ @Override
+ public ByteBuffer getByteBuffer() {
+ return buf.getByteBuffer();
+ }
+
@Override
public void close() {
buf.close();
diff --git a/core/src/main/java/io/grpc/internal/ReadableBuffer.java b/core/src/main/java/io/grpc/internal/ReadableBuffer.java
index 7d2ca7ebba5..b47501a9943 100644
--- a/core/src/main/java/io/grpc/internal/ReadableBuffer.java
+++ b/core/src/main/java/io/grpc/internal/ReadableBuffer.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
/**
* Interface for an abstract byte buffer. Buffers are intended to be a read-only, except for the
@@ -123,6 +124,44 @@ public interface ReadableBuffer extends Closeable {
*/
int arrayOffset();
+ /**
+ * Indicates whether or not {@link #mark} operation is supported for this buffer.
+ */
+ boolean markSupported();
+
+ /**
+ * Marks the current position in this buffer. A subsequent call to the {@link #reset} method
+ * repositions this stream at the last marked position so that subsequent reads re-read the same
+ * bytes.
+ */
+ void mark();
+
+ /**
+ * Repositions this buffer to the position at the time {@link #mark} was last called on this
+ * buffer.
+ */
+ void reset();
+
+ /**
+ * Indicates whether or not {@link #getByteBuffer} operation is supported for this buffer.
+ */
+ boolean byteBufferSupported();
+
+ /**
+ * Gets a {@link ByteBuffer} that contains some bytes of the content next to be read, or {@code
+ * null} if this buffer has been exhausted. The number of bytes contained in the returned buffer
+ * is implementation specific. The position of this buffer is unchanged after calling this
+ * method. The returned buffer's content should not be modified, but the position, limit, and
+ * mark may be changed. Operations for changing the position, limit, and mark of the returned
+ * buffer does not affect the position, limit, and mark of this buffer. Buffers returned by this
+ * method have independent position, limit and mark. This is an optional method, so callers
+ * should first check {@link #byteBufferSupported}.
+ *
+ * @throws UnsupportedOperationException the buffer does not support this method.
+ */
+ @Nullable
+ ByteBuffer getByteBuffer();
+
/**
* Closes this buffer and releases any resources.
*/
diff --git a/core/src/main/java/io/grpc/internal/ReadableBuffers.java b/core/src/main/java/io/grpc/internal/ReadableBuffers.java
index cfe5542a573..c54cb0e67d0 100644
--- a/core/src/main/java/io/grpc/internal/ReadableBuffers.java
+++ b/core/src/main/java/io/grpc/internal/ReadableBuffers.java
@@ -19,13 +19,17 @@
import static com.google.common.base.Charsets.UTF_8;
import com.google.common.base.Preconditions;
+import io.grpc.Detachable;
+import io.grpc.HasByteBuffer;
import io.grpc.KnownLength;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
+import java.nio.InvalidMarkException;
import java.nio.charset.Charset;
+import javax.annotation.Nullable;
/**
* Utility methods for creating {@link ReadableBuffer} instances.
@@ -128,6 +132,7 @@ private static class ByteArrayWrapper extends AbstractReadableBuffer {
int offset;
final int end;
final byte[] bytes;
+ int mark = -1;
ByteArrayWrapper(byte[] bytes) {
this(bytes, 0, bytes.length);
@@ -204,6 +209,24 @@ public byte[] array() {
public int arrayOffset() {
return offset;
}
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public void mark() {
+ mark = offset;
+ }
+
+ @Override
+ public void reset() {
+ if (mark == -1) {
+ throw new InvalidMarkException();
+ }
+ offset = mark;
+ }
}
/**
@@ -291,13 +314,39 @@ public byte[] array() {
public int arrayOffset() {
return bytes.arrayOffset() + bytes.position();
}
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public void mark() {
+ bytes.mark();
+ }
+
+ @Override
+ public void reset() {
+ bytes.reset();
+ }
+
+ @Override
+ public boolean byteBufferSupported() {
+ return true;
+ }
+
+ @Override
+ public ByteBuffer getByteBuffer() {
+ return bytes.slice();
+ }
}
/**
* An {@link InputStream} that is backed by a {@link ReadableBuffer}.
*/
- private static final class BufferInputStream extends InputStream implements KnownLength {
- final ReadableBuffer buffer;
+ private static final class BufferInputStream extends InputStream
+ implements KnownLength, HasByteBuffer, Detachable {
+ private ReadableBuffer buffer;
public BufferInputStream(ReadableBuffer buffer) {
this.buffer = Preconditions.checkNotNull(buffer, "buffer");
@@ -329,6 +378,46 @@ public int read(byte[] dest, int destOffset, int length) throws IOException {
return length;
}
+ @Override
+ public long skip(long n) throws IOException {
+ int length = (int) Math.min(buffer.readableBytes(), n);
+ buffer.skipBytes(length);
+ return length;
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ buffer.mark();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ buffer.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return buffer.markSupported();
+ }
+
+ @Override
+ public boolean byteBufferSupported() {
+ return buffer.byteBufferSupported();
+ }
+
+ @Nullable
+ @Override
+ public ByteBuffer getByteBuffer() {
+ return buffer.getByteBuffer();
+ }
+
+ @Override
+ public InputStream detach() {
+ ReadableBuffer detachedBuffer = buffer;
+ buffer = buffer.readBytes(0);
+ return new BufferInputStream(detachedBuffer);
+ }
+
@Override
public void close() throws IOException {
buffer.close();
diff --git a/core/src/test/java/io/grpc/internal/CompositeReadableBufferTest.java b/core/src/test/java/io/grpc/internal/CompositeReadableBufferTest.java
index 660aa116317..011d83b548a 100644
--- a/core/src/test/java/io/grpc/internal/CompositeReadableBufferTest.java
+++ b/core/src/test/java/io/grpc/internal/CompositeReadableBufferTest.java
@@ -17,14 +17,20 @@
package io.grpc.internal;
import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
+import java.nio.InvalidMarkException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -154,6 +160,145 @@ public void readStreamShouldSucceed() throws IOException {
assertEquals(EXPECTED_VALUE, new String(bos.toByteArray(), UTF_8));
}
+ @Test
+ public void markSupportedOnlyAllComponentsSupportMark() {
+ composite = new CompositeReadableBuffer();
+ ReadableBuffer buffer1 = mock(ReadableBuffer.class);
+ ReadableBuffer buffer2 = mock(ReadableBuffer.class);
+ ReadableBuffer buffer3 = mock(ReadableBuffer.class);
+ when(buffer1.markSupported()).thenReturn(true);
+ when(buffer2.markSupported()).thenReturn(true);
+ when(buffer3.markSupported()).thenReturn(false);
+ composite.addBuffer(buffer1);
+ assertTrue(composite.markSupported());
+ composite.addBuffer(buffer2);
+ assertTrue(composite.markSupported());
+ composite.addBuffer(buffer3);
+ assertFalse(composite.markSupported());
+ }
+
+ @Test
+ public void resetUnmarkedShouldThrow() {
+ try {
+ composite.reset();
+ fail();
+ } catch (InvalidMarkException expected) {
+ }
+ }
+
+ @Test
+ public void markAndResetWithSkipBytesShouldSucceed() {
+ composite.mark();
+ composite.skipBytes(EXPECTED_VALUE.length() / 2);
+ composite.reset();
+ assertEquals(EXPECTED_VALUE.length(), composite.readableBytes());
+ }
+
+ @Test
+ public void markAndResetWithReadUnsignedByteShouldSucceed() {
+ composite.readUnsignedByte();
+ composite.mark();
+ int b = composite.readUnsignedByte();
+ composite.reset();
+ assertEquals(EXPECTED_VALUE.length() - 1, composite.readableBytes());
+ assertEquals(b, composite.readUnsignedByte());
+ }
+
+ @Test
+ public void markAndResetWithReadByteArrayShouldSucceed() {
+ composite.mark();
+ byte[] first = new byte[EXPECTED_VALUE.length()];
+ composite.readBytes(first, 0, EXPECTED_VALUE.length());
+ composite.reset();
+ assertEquals(EXPECTED_VALUE.length(), composite.readableBytes());
+ byte[] second = new byte[EXPECTED_VALUE.length()];
+ composite.readBytes(second, 0, EXPECTED_VALUE.length());
+ assertArrayEquals(first, second);
+ }
+
+ @Test
+ public void markAndResetWithReadByteBufferShouldSucceed() {
+ byte[] first = new byte[EXPECTED_VALUE.length()];
+ composite.mark();
+ composite.readBytes(ByteBuffer.wrap(first));
+ composite.reset();
+ byte[] second = new byte[EXPECTED_VALUE.length()];
+ assertEquals(EXPECTED_VALUE.length(), composite.readableBytes());
+ composite.readBytes(ByteBuffer.wrap(second));
+ assertArrayEquals(first, second);
+ }
+
+ @Test
+ public void markAndResetWithReadStreamShouldSucceed() throws IOException {
+ ByteArrayOutputStream first = new ByteArrayOutputStream();
+ composite.mark();
+ composite.readBytes(first, EXPECTED_VALUE.length() / 2);
+ composite.reset();
+ assertEquals(EXPECTED_VALUE.length(), composite.readableBytes());
+ ByteArrayOutputStream second = new ByteArrayOutputStream();
+ composite.readBytes(second, EXPECTED_VALUE.length() / 2);
+ assertArrayEquals(first.toByteArray(), second.toByteArray());
+ }
+
+ @Test
+ public void markAndResetWithReadReadableBufferShouldSucceed() {
+ composite.readBytes(EXPECTED_VALUE.length() / 2);
+ int remaining = composite.readableBytes();
+ composite.mark();
+ ReadableBuffer first = composite.readBytes(1);
+ composite.reset();
+ assertEquals(remaining, composite.readableBytes());
+ ReadableBuffer second = composite.readBytes(1);
+ assertEquals(first.readUnsignedByte(), second.readUnsignedByte());
+ }
+
+ @Test
+ public void markAgainShouldOverwritePreviousMark() {
+ composite.mark();
+ composite.skipBytes(EXPECTED_VALUE.length() / 2);
+ int remaining = composite.readableBytes();
+ composite.mark();
+ composite.skipBytes(1);
+ composite.reset();
+ assertEquals(remaining, composite.readableBytes());
+ }
+
+ @Test
+ public void bufferAddedAfterMarkedShouldBeIncluded() {
+ composite = new CompositeReadableBuffer();
+ composite.mark();
+ splitAndAdd(EXPECTED_VALUE);
+ composite.skipBytes(EXPECTED_VALUE.length() / 2);
+ composite.reset();
+ assertEquals(EXPECTED_VALUE.length(), composite.readableBytes());
+ }
+
+ @Test
+ public void canUseByteBufferOnlyAllComponentsSupportUsingByteBuffer() {
+ composite = new CompositeReadableBuffer();
+ ReadableBuffer buffer1 = mock(ReadableBuffer.class);
+ ReadableBuffer buffer2 = mock(ReadableBuffer.class);
+ ReadableBuffer buffer3 = mock(ReadableBuffer.class);
+ when(buffer1.byteBufferSupported()).thenReturn(true);
+ when(buffer2.byteBufferSupported()).thenReturn(true);
+ when(buffer3.byteBufferSupported()).thenReturn(false);
+ composite.addBuffer(buffer1);
+ assertTrue(composite.byteBufferSupported());
+ composite.addBuffer(buffer2);
+ assertTrue(composite.byteBufferSupported());
+ composite.addBuffer(buffer3);
+ assertFalse(composite.byteBufferSupported());
+ }
+
+ @Test
+ public void getByteBufferDelegatesToComponents() {
+ composite = new CompositeReadableBuffer();
+ ReadableBuffer buffer = mock(ReadableBuffer.class);
+ composite.addBuffer(buffer);
+ composite.getByteBuffer();
+ verify(buffer).getByteBuffer();
+ }
+
@Test
public void closeShouldCloseBuffers() {
composite = new CompositeReadableBuffer();
diff --git a/core/src/test/java/io/grpc/internal/ReadableBufferTestBase.java b/core/src/test/java/io/grpc/internal/ReadableBufferTestBase.java
index e469b807d51..97e0df38ae7 100644
--- a/core/src/test/java/io/grpc/internal/ReadableBufferTestBase.java
+++ b/core/src/test/java/io/grpc/internal/ReadableBufferTestBase.java
@@ -24,6 +24,7 @@
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -117,6 +118,58 @@ public void partialReadToReadableBufferShouldSucceed() {
assertArrayEquals(new byte[] {'h', 'e'}, Arrays.copyOfRange(array, 0, 2));
}
+ @Test
+ public void markAndResetWithReadShouldSucceed() {
+ ReadableBuffer buffer = buffer();
+ int offset = 5;
+ buffer.readBytes(new byte[offset], 0, offset);
+ buffer.mark();
+ int b = buffer.readUnsignedByte();
+ assertEquals(msg.length() - offset - 1, buffer.readableBytes());
+ buffer.reset();
+ assertEquals(msg.length() - offset, buffer.readableBytes());
+ assertEquals(b, buffer.readUnsignedByte());
+ }
+
+ @Test
+ public void markAndResetWithReadToReadableBufferShouldSucceed() {
+ ReadableBuffer buffer = buffer();
+ int offset = 5;
+ buffer.readBytes(offset);
+ int testLen = 100;
+ buffer.mark();
+ ReadableBuffer first = buffer.readBytes(testLen);
+ assertEquals(msg.length() - offset - testLen, buffer.readableBytes());
+ buffer.reset();
+ assertEquals(msg.length() - offset, buffer.readableBytes());
+ ReadableBuffer second = buffer.readBytes(testLen);
+ byte[] array1 = new byte[testLen];
+ byte[] array2 = new byte[testLen];
+ first.readBytes(array1, 0, testLen);
+ second.readBytes(array2, 0, testLen);
+ assertArrayEquals(array1, array2);
+ }
+
+ @Test
+ public void getByteBufferDoesNotAffectBufferPosition() {
+ ReadableBuffer buffer = buffer();
+ Assume.assumeTrue(buffer.byteBufferSupported());
+ ByteBuffer byteBuffer = buffer.getByteBuffer();
+ assertEquals(msg.length(), buffer.readableBytes());
+ byteBuffer.get(new byte[byteBuffer.remaining()]);
+ assertEquals(msg.length(), buffer.readableBytes());
+ }
+
+ @Test
+ public void getByteBufferIsNotAffectedByBufferRead() {
+ ReadableBuffer buffer = buffer();
+ Assume.assumeTrue(buffer.byteBufferSupported());
+ ByteBuffer byteBuffer = buffer.getByteBuffer();
+ int initialRemaining = byteBuffer.remaining();
+ buffer.readBytes(new byte[100], 0, 100);
+ assertEquals(initialRemaining, byteBuffer.remaining());
+ }
+
protected abstract ReadableBuffer buffer();
private static String repeatUntilLength(String toRepeat, int length) {
diff --git a/core/src/test/java/io/grpc/internal/ReadableBuffersTest.java b/core/src/test/java/io/grpc/internal/ReadableBuffersTest.java
index ea9daeed6a3..0947f65da12 100644
--- a/core/src/test/java/io/grpc/internal/ReadableBuffersTest.java
+++ b/core/src/test/java/io/grpc/internal/ReadableBuffersTest.java
@@ -19,13 +19,22 @@
import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.grpc.Detachable;
+import io.grpc.HasByteBuffer;
+import java.io.IOException;
import java.io.InputStream;
+import java.nio.InvalidMarkException;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -37,6 +46,9 @@
public class ReadableBuffersTest {
private static final byte[] MSG_BYTES = "hello".getBytes(UTF_8);
+ @Rule
+ public final ExpectedException thrown = ExpectedException.none();
+
@Test
public void empty_returnsEmptyBuffer() {
ReadableBuffer buffer = ReadableBuffers.empty();
@@ -128,4 +140,108 @@ public void bufferInputStream_close_closesBuffer() throws Exception {
inputStream.close();
verify(buffer, times(1)).close();
}
+
+ @Test
+ public void bufferInputStream_markAndReset() throws IOException {
+ ReadableBuffer buffer = ReadableBuffers.wrap(MSG_BYTES);
+ InputStream inputStream = ReadableBuffers.openStream(buffer, true);
+ assertTrue(inputStream.markSupported());
+ inputStream.mark(2);
+ byte[] first = new byte[5];
+ inputStream.read(first);
+ assertEquals(0, inputStream.available());
+ inputStream.reset();
+ assertEquals(5, inputStream.available());
+ byte[] second = new byte[5];
+ inputStream.read(second);
+ assertArrayEquals(first, second);
+ }
+
+ @Test
+ public void bufferInputStream_getByteBufferDelegatesToBuffer() {
+ ReadableBuffer buffer = mock(ReadableBuffer.class);
+ when(buffer.byteBufferSupported()).thenReturn(true);
+ InputStream inputStream = ReadableBuffers.openStream(buffer, true);
+ assertTrue(((HasByteBuffer) inputStream).byteBufferSupported());
+ ((HasByteBuffer) inputStream).getByteBuffer();
+ verify(buffer).getByteBuffer();
+ }
+
+ @Test
+ public void bufferInputStream_availableAfterDetached_returnsZeroByte() throws IOException {
+ ReadableBuffer buffer = ReadableBuffers.wrap(MSG_BYTES);
+ InputStream inputStream = ReadableBuffers.openStream(buffer, true);
+ assertEquals(5, inputStream.available());
+ InputStream detachedStream = ((Detachable) inputStream).detach();
+ assertEquals(0, inputStream.available());
+ assertEquals(5, detachedStream.available());
+ }
+
+ @Test
+ public void bufferInputStream_skipAfterDetached() throws IOException {
+ ReadableBuffer buffer = ReadableBuffers.wrap(MSG_BYTES);
+ InputStream inputStream = ReadableBuffers.openStream(buffer, true);
+ assertEquals(3, inputStream.skip(3));
+ InputStream detachedStream = ((Detachable) inputStream).detach();
+ assertEquals(0, inputStream.skip(2));
+ assertEquals(2, detachedStream.skip(2));
+ }
+
+ @Test
+ public void bufferInputStream_readUnsignedByteAfterDetached() throws IOException {
+ ReadableBuffer buffer = ReadableBuffers.wrap(MSG_BYTES);
+ InputStream inputStream = ReadableBuffers.openStream(buffer, true);
+ assertEquals((int) 'h', inputStream.read());
+ InputStream detachedStream = ((Detachable) inputStream).detach();
+ assertEquals(-1, inputStream.read());
+ assertEquals((int) 'e', detachedStream.read());
+ }
+
+ @Test
+ public void bufferInputStream_partialReadAfterDetached() throws IOException {
+ ReadableBuffer buffer = ReadableBuffers.wrap(MSG_BYTES);
+ InputStream inputStream = ReadableBuffers.openStream(buffer, true);
+ byte[] dest = new byte[3];
+ assertEquals(3, inputStream.read(dest, /*destOffset*/ 0, /*length*/ 3));
+ assertArrayEquals(new byte[]{'h', 'e', 'l'}, dest);
+ InputStream detachedStream = ((Detachable) inputStream).detach();
+ byte[] newDest = new byte[2];
+ assertEquals(2, detachedStream.read(newDest, /*destOffset*/ 0, /*length*/ 2));
+ assertArrayEquals(new byte[]{'l', 'o'}, newDest);
+ }
+
+ @Test
+ public void bufferInputStream_markDiscardedAfterDetached() throws IOException {
+ ReadableBuffer buffer = ReadableBuffers.wrap(MSG_BYTES);
+ InputStream inputStream = ReadableBuffers.openStream(buffer, true);
+ inputStream.mark(5);
+ ((Detachable) inputStream).detach();
+ thrown.expect(InvalidMarkException.class);
+ inputStream.reset();
+ }
+
+ @Test
+ public void bufferInputStream_markPreservedInForkedInputStream() throws IOException {
+ ReadableBuffer buffer = ReadableBuffers.wrap(MSG_BYTES);
+ InputStream inputStream = ReadableBuffers.openStream(buffer, true);
+ inputStream.skip(2);
+ inputStream.mark(3);
+ InputStream detachedStream = ((Detachable) inputStream).detach();
+ detachedStream.skip(3);
+ assertEquals(0, detachedStream.available());
+ detachedStream.reset();
+ assertEquals(3, detachedStream.available());
+ }
+
+ @Test
+ public void bufferInputStream_closeAfterDetached() throws IOException {
+ ReadableBuffer buffer = mock(ReadableBuffer.class);
+ when(buffer.readBytes(anyInt())).thenReturn(mock(ReadableBuffer.class));
+ InputStream inputStream = ReadableBuffers.openStream(buffer, true);
+ InputStream detachedStream = ((Detachable) inputStream).detach();
+ inputStream.close();
+ verify(buffer, never()).close();
+ detachedStream.close();
+ verify(buffer).close();
+ }
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyReadableBuffer.java b/netty/src/main/java/io/grpc/netty/NettyReadableBuffer.java
index 37caccb0eb3..cce58f1e60d 100644
--- a/netty/src/main/java/io/grpc/netty/NettyReadableBuffer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyReadableBuffer.java
@@ -94,6 +94,31 @@ public int arrayOffset() {
return buffer.arrayOffset() + buffer.readerIndex();
}
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public void mark() {
+ buffer.markReaderIndex();
+ }
+
+ @Override
+ public void reset() {
+ buffer.resetReaderIndex();
+ }
+
+ @Override
+ public boolean byteBufferSupported() {
+ return buffer.nioBufferCount() > 0;
+ }
+
+ @Override
+ public ByteBuffer getByteBuffer() {
+ return buffer.nioBufferCount() == 1 ? buffer.nioBuffer() : buffer.nioBuffers()[0];
+ }
+
/**
* If the first call to close, calls {@link ByteBuf#release} to release the internal Netty buffer.
*/
diff --git a/netty/src/test/java/io/grpc/netty/NettyReadableBufferTest.java b/netty/src/test/java/io/grpc/netty/NettyReadableBufferTest.java
index 8090e601911..1a0ac229a89 100644
--- a/netty/src/test/java/io/grpc/netty/NettyReadableBufferTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyReadableBufferTest.java
@@ -17,11 +17,16 @@
package io.grpc.netty;
import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import com.google.common.base.Splitter;
import io.grpc.internal.ReadableBuffer;
import io.grpc.internal.ReadableBufferTestBase;
+import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -52,6 +57,29 @@ public void closeMultipleTimesShouldReleaseBufferOnce() {
assertEquals(0, buffer.buffer().refCnt());
}
+ @Test
+ public void getByteBufferFromSingleNioBufferBackedBuffer() {
+ assertTrue(buffer.byteBufferSupported());
+ ByteBuffer byteBuffer = buffer.getByteBuffer();
+ byte[] arr = new byte[byteBuffer.remaining()];
+ byteBuffer.get(arr);
+ assertArrayEquals(msg.getBytes(UTF_8), arr);
+ }
+
+ @Test
+ public void getByteBufferFromCompositeBufferReturnsOnlyFirstComponent() {
+ CompositeByteBuf composite = Unpooled.compositeBuffer(10);
+ int chunks = 4;
+ int chunkLen = msg.length() / chunks;
+ for (String chunk : Splitter.fixedLength(chunkLen).split(msg)) {
+ composite.addComponent(true, Unpooled.copiedBuffer(chunk.getBytes(UTF_8)));
+ }
+ buffer = new NettyReadableBuffer(composite);
+ byte[] array = new byte[chunkLen];
+ buffer.getByteBuffer().get(array);
+ assertArrayEquals(msg.substring(0, chunkLen).getBytes(UTF_8), array);
+ }
+
@Override
protected ReadableBuffer buffer() {
return buffer;
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpReadableBufferTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpReadableBufferTest.java
index 2ece98ffb97..4aeeae2fa8b 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpReadableBufferTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpReadableBufferTest.java
@@ -56,6 +56,18 @@ public void partialReadToByteBufferShouldSucceed() {
// Not supported.
}
+ @Override
+ @Test
+ public void markAndResetWithReadShouldSucceed() {
+ // Not supported.
+ }
+
+ @Override
+ @Test
+ public void markAndResetWithReadToReadableBufferShouldSucceed() {
+ // Not supported.
+ }
+
@Override
protected ReadableBuffer buffer() {
return buffer;