diff --git a/api/src/main/java/io/grpc/ManagedBytes.java b/api/src/main/java/io/grpc/ManagedBytes.java new file mode 100644 index 00000000000..a631d7c0ec6 --- /dev/null +++ b/api/src/main/java/io/grpc/ManagedBytes.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import java.nio.ByteBuffer; +import java.util.List; +import javax.annotation.Nullable; + +/** + * A logical representation of bytes with the responsibility of managing the resource lifecycle + * used to hold the data. Usually used for transferring data by directly hand over the ownership + * of the backing resources. + */ +public abstract class ManagedBytes { + + /** + * Returns the data in a list of {@link ByteBuffer}s. + */ + public abstract List asByteBuffers(); + + /** + * Release the usage of the data. The underlying resource used to hold the data may be released + * and no more access should be attempted. + */ + public void release() { + // no-op + } + + /** + * A readable data source that supports the operation of reading its content into + * {@link ManagedBytes}. + */ + public interface ManagedBytesReadable { + + /** + * Reads up to a total of {@code length} bytes as {@link ManagedBytes}. + * + * @param length the maximum number of bytes to be read. + * @return {@link ManagedBytes} that contains the bytes being read or {@code null} + * if no more bytes can be read. + */ + @Nullable + ManagedBytes readManagedBytes(int length); + } +} diff --git a/core/src/main/java/io/grpc/internal/AbstractReadableBuffer.java b/core/src/main/java/io/grpc/internal/AbstractReadableBuffer.java index e43b7a7cc0e..e007021a985 100644 --- a/core/src/main/java/io/grpc/internal/AbstractReadableBuffer.java +++ b/core/src/main/java/io/grpc/internal/AbstractReadableBuffer.java @@ -16,6 +16,8 @@ package io.grpc.internal; +import io.grpc.ManagedBytes; + /** * Abstract base class for {@link ReadableBuffer} implementations. */ @@ -30,6 +32,16 @@ public final int readInt() { return (b1 << 24) | (b2 << 16) | (b3 << 8) | b4; } + @Override + public boolean shouldUseManagedBytes() { + return false; + } + + @Override + public ManagedBytes readManagedBytes(int length) { + throw new UnsupportedOperationException(); + } + @Override public boolean hasArray() { return false; diff --git a/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java b/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java index 9a9bf5c9266..61d02ef4dc1 100644 --- a/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java +++ b/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java @@ -16,11 +16,15 @@ package io.grpc.internal; +import com.google.common.base.Preconditions; +import io.grpc.ManagedBytes; import java.io.IOException; import java.io.OutputStream; import java.nio.Buffer; import java.nio.ByteBuffer; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; import java.util.Queue; /** @@ -59,6 +63,75 @@ public void addBuffer(ReadableBuffer buffer) { compositeBuffer.close(); } + @Override + public boolean shouldUseManagedBytes() { + for (ReadableBuffer buf : buffers) { + if (!buf.shouldUseManagedBytes()) { + return false; + } + } + return true; + } + + @Override + public ManagedBytes readManagedBytes(int length) { + checkReadable(length); + readableBytes -= length; + + CompositeManagedBytes res = new CompositeManagedBytes(); + while (length > 0) { + ReadableBuffer buffer = buffers.peek(); + if (buffer.readableBytes() > length) { + res.addSharedBytes(buffer.readManagedBytes(length)); + length = 0; + } else { + res.addBuffer(buffers.poll()); + length -= buffer.readableBytes(); + } + } + return res; + } + + private static final class CompositeManagedBytes extends ManagedBytes { + private final List managedBytesList = new ArrayList<>(); + + private void addBuffer(final ReadableBuffer buffer) { + Preconditions.checkArgument( + buffer.shouldUseManagedBytes(), "buffer does not support shared bytes"); + managedBytesList.add(new ManagedBytes() { + @Override + public List asByteBuffers() { + return buffer.readManagedBytes(buffer.readableBytes()).asByteBuffers(); + } + + @Override + public void release() { + buffer.close(); + } + }); + } + + private void addSharedBytes(ManagedBytes managedBytes) { + managedBytesList.add(managedBytes); + } + + @Override + public List asByteBuffers() { + List res = new ArrayList<>(); + for (ManagedBytes managedBytes : managedBytesList) { + res.addAll(managedBytes.asByteBuffers()); + } + return res; + } + + @Override + public void release() { + for (ManagedBytes managedBytes : managedBytesList) { + managedBytes.release(); + } + } + } + @Override public int readableBytes() { return readableBytes; diff --git a/core/src/main/java/io/grpc/internal/ForwardingReadableBuffer.java b/core/src/main/java/io/grpc/internal/ForwardingReadableBuffer.java index 03fafee768b..8b0593a2a7f 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingReadableBuffer.java +++ b/core/src/main/java/io/grpc/internal/ForwardingReadableBuffer.java @@ -18,6 +18,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import io.grpc.ManagedBytes; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -81,6 +82,16 @@ public ReadableBuffer readBytes(int length) { return buf.readBytes(length); } + @Override + public boolean shouldUseManagedBytes() { + return buf.shouldUseManagedBytes(); + } + + @Override + public ManagedBytes readManagedBytes(int length) { + return buf.readManagedBytes(length); + } + @Override public boolean hasArray() { return buf.hasArray(); diff --git a/core/src/main/java/io/grpc/internal/ReadableBuffer.java b/core/src/main/java/io/grpc/internal/ReadableBuffer.java index 7d2ca7ebba5..c92fe6c9ca9 100644 --- a/core/src/main/java/io/grpc/internal/ReadableBuffer.java +++ b/core/src/main/java/io/grpc/internal/ReadableBuffer.java @@ -16,6 +16,7 @@ package io.grpc.internal; +import io.grpc.ManagedBytes; import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; @@ -102,6 +103,23 @@ public interface ReadableBuffer extends Closeable { */ ReadableBuffer readBytes(int length); + /** + * Indicates whether or not this buffer supports {@link #readManagedBytes} operation that returns + * buffer's content as {@link ManagedBytes}. + */ + boolean shouldUseManagedBytes(); + + /** + * Reads {@code length} bytes as {@link ManagedBytes}. This is an optional method, so callers + * should first check {@link #shouldUseManagedBytes}. Closing this buffer too early may + * result in the returned {@link ManagedBytes} no longer readable. + * + * @param length the total number of bytes to contain in the returned {@link ManagedBytes}s. + * @throws UnsupportedOperationException the buffer does not support this method + * @throws IndexOutOfBoundsException if required bytes are not readable + */ + ManagedBytes readManagedBytes(int length); + /** * Indicates whether or not this buffer exposes a backing array. */ diff --git a/core/src/main/java/io/grpc/internal/ReadableBuffers.java b/core/src/main/java/io/grpc/internal/ReadableBuffers.java index 34805420fa5..099ac2bdf47 100644 --- a/core/src/main/java/io/grpc/internal/ReadableBuffers.java +++ b/core/src/main/java/io/grpc/internal/ReadableBuffers.java @@ -20,12 +20,15 @@ import com.google.common.base.Preconditions; import io.grpc.KnownLength; +import io.grpc.ManagedBytes; +import io.grpc.ManagedBytes.ManagedBytesReadable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import javax.annotation.Nullable; /** * Utility methods for creating {@link ReadableBuffer} instances. @@ -103,7 +106,11 @@ public static String readAsStringUtf8(ReadableBuffer buffer) { * @param owner if {@code true}, the returned stream will close the buffer when closed. */ public static InputStream openStream(ReadableBuffer buffer, boolean owner) { - return new BufferInputStream(owner ? buffer : ignoreClose(buffer)); + if (!owner) { + buffer = ignoreClose(buffer); + } + return buffer.shouldUseManagedBytes() + ? new ManagedBytesInputStream(buffer) : new BufferInputStream(buffer); } /** @@ -297,7 +304,7 @@ public int arrayOffset() { /** * An {@link InputStream} that is backed by a {@link ReadableBuffer}. */ - private static final class BufferInputStream extends InputStream implements KnownLength { + private static class BufferInputStream extends InputStream implements KnownLength { final ReadableBuffer buffer; public BufferInputStream(ReadableBuffer buffer) { @@ -336,5 +343,28 @@ public void close() throws IOException { } } + /** + * A {@link BufferInputStream} that supports data transfer via {@link ManagedBytes}. + */ + private static final class ManagedBytesInputStream extends BufferInputStream + implements ManagedBytesReadable { + + ManagedBytesInputStream(ReadableBuffer buffer) { + super(buffer); + } + + @Nullable + @Override + public ManagedBytes readManagedBytes(int length) { + if (buffer.readableBytes() == 0) { + // EOF. + return null; + } + + length = Math.min(buffer.readableBytes(), length); + return buffer.readManagedBytes(length); + } + } + private ReadableBuffers() {} } diff --git a/netty/src/main/java/io/grpc/netty/NettyReadableBuffer.java b/netty/src/main/java/io/grpc/netty/NettyReadableBuffer.java index 37caccb0eb3..9938626d7c0 100644 --- a/netty/src/main/java/io/grpc/netty/NettyReadableBuffer.java +++ b/netty/src/main/java/io/grpc/netty/NettyReadableBuffer.java @@ -17,11 +17,14 @@ package io.grpc.netty; import com.google.common.base.Preconditions; +import io.grpc.ManagedBytes; import io.grpc.internal.AbstractReadableBuffer; import io.netty.buffer.ByteBuf; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; /** * A {@link java.nio.Buffer} implementation that is backed by a Netty {@link ByteBuf}. This class @@ -79,6 +82,26 @@ public NettyReadableBuffer readBytes(int length) { return new NettyReadableBuffer(buffer.readRetainedSlice(length)); } + @Override + public boolean shouldUseManagedBytes() { + return buffer.nioBufferCount() > 0; + } + + @Override + public ManagedBytes readManagedBytes(int length) { + if (buffer.readableBytes() < length) { + throw new IndexOutOfBoundsException(); + } + final List res = Arrays.asList(buffer.nioBuffers(buffer.readerIndex(), length)); + buffer.skipBytes(length); + return new ManagedBytes() { + @Override + public List asByteBuffers() { + return res; + } + }; + } + @Override public boolean hasArray() { return buffer.hasArray(); diff --git a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java index ddba5b8d5b1..23038186ed2 100644 --- a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java +++ b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java @@ -26,6 +26,8 @@ import com.google.protobuf.Parser; import io.grpc.ExperimentalApi; import io.grpc.KnownLength; +import io.grpc.ManagedBytes; +import io.grpc.ManagedBytes.ManagedBytesReadable; import io.grpc.Metadata; import io.grpc.MethodDescriptor.Marshaller; import io.grpc.MethodDescriptor.PrototypeMarshaller; @@ -170,10 +172,19 @@ public T parse(InputStream stream) { } } CodedInputStream cis = null; + ManagedBytes managedBytes = null; try { if (stream instanceof KnownLength) { int size = stream.available(); - if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) { + if (size <= 0) { + return defaultInstance; + } + // TODO(chengyuanzhang): we may still want to go with the byte array approach for small + // messages. + if (stream instanceof ManagedBytesReadable) { + managedBytes = ((ManagedBytesReadable) stream).readManagedBytes(size); + cis = CodedInputStream.newInstance(managedBytes.asByteBuffers()); + } else if (size < DEFAULT_MAX_MESSAGE_SIZE) { Reference ref; // buf should not be used after this method has returned. byte[] buf; @@ -197,8 +208,6 @@ public T parse(InputStream stream) { throw new RuntimeException("size inaccurate: " + size + " != " + position); } cis = CodedInputStream.newInstance(buf, 0, size); - } else if (size == 0) { - return defaultInstance; } } } catch (IOException e) { @@ -216,6 +225,10 @@ public T parse(InputStream stream) { } catch (InvalidProtocolBufferException ipbe) { throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence") .withCause(ipbe).asRuntimeException(); + } finally { + if (managedBytes != null) { + managedBytes.release(); + } } }