Skip to content

Commit

Permalink
Simplifying gRPC buffered read flow (#853)
Browse files Browse the repository at this point in the history
* Simplifying gRPC buffered read flow

* Addressing CR comments
  • Loading branch information
abmodi committed Aug 16, 2022
1 parent 782b3e9 commit 96cdbc8
Showing 1 changed file with 18 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ public class GoogleCloudStorageGrpcReadChannel implements SeekableByteChannel {
// keep the most recently received content and a reference to how much of it we've returned so far
@Nullable private ByteString bufferedContent;

private int bufferedContentReadOffset;

// InputStream that backs bufferedContent. This needs to be closed when bufferedContent is no
// longer needed.
@Nullable private InputStream streamForBufferedContent;
Expand Down Expand Up @@ -266,23 +264,25 @@ private static void put(ByteString source, int offset, int size, ByteBuffer dest
private int readBufferedContentInto(ByteBuffer byteBuffer) {
// Handle skipping forward through the buffer for a seek.
long bytesToSkip = positionForNextRead - positionInGrpcStream;
long bufferSkip = min(bufferedContent.size() - bufferedContentReadOffset, bytesToSkip);
bufferSkip = max(0, bufferSkip);
bufferedContentReadOffset += bufferSkip;
positionInGrpcStream += bufferSkip;
int remainingBufferedBytes = bufferedContent.size() - bufferedContentReadOffset;

boolean remainingBufferedContentLargerThanByteBuffer =
remainingBufferedBytes > byteBuffer.remaining();
int bytesToWrite =
remainingBufferedContentLargerThanByteBuffer
? byteBuffer.remaining()
: remainingBufferedBytes;
put(bufferedContent, bufferedContentReadOffset, bytesToWrite, byteBuffer);

if (bytesToSkip >= bufferedContent.size()) {
positionInGrpcStream += bufferedContent.size();
invalidateBufferedContent();
return 0;
}

if (bytesToSkip > 0) {
positionInGrpcStream += bytesToSkip;
bufferedContent = bufferedContent.substring(Math.toIntExact(bytesToSkip));
}

int bytesToWrite = Math.min(byteBuffer.remaining(), bufferedContent.size());
put(bufferedContent, 0, bytesToWrite, byteBuffer);
positionInGrpcStream += bytesToWrite;
positionForNextRead += bytesToWrite;
if (remainingBufferedContentLargerThanByteBuffer) {
bufferedContentReadOffset += bytesToWrite;

if (bytesToWrite < bufferedContent.size()) {
bufferedContent = bufferedContent.substring(bytesToWrite);
} else {
invalidateBufferedContent();
}
Expand Down Expand Up @@ -447,8 +447,7 @@ private int readObjectContentFromGCS(ByteBuffer byteBuffer) throws IOException {
positionForNextRead += bytesToWrite;
if (responseSizeLargerThanRemainingBuffer) {
invalidateBufferedContent();
bufferedContent = content;
bufferedContentReadOffset = bytesToWrite;
bufferedContent = content.substring(bytesToWrite);
// This is to keep the stream alive for the message backed by this.
streamForBufferedContent = stream;
stream = null;
Expand Down Expand Up @@ -712,7 +711,6 @@ public String toString() {

private void invalidateBufferedContent() {
bufferedContent = null;
bufferedContentReadOffset = 0;
if (streamForBufferedContent != null) {
try {
streamForBufferedContent.close();
Expand Down

0 comments on commit 96cdbc8

Please sign in to comment.