From 96cdbc8b9f971da47ee90e90243dbff0c5fa6970 Mon Sep 17 00:00:00 2001 From: Abhishek Modi Date: Tue, 16 Aug 2022 19:01:31 +0530 Subject: [PATCH] Simplifying gRPC buffered read flow (#853) * Simplifying gRPC buffered read flow * Addressing CR comments --- .../GoogleCloudStorageGrpcReadChannel.java | 38 +++++++++---------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageGrpcReadChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageGrpcReadChannel.java index 6b67d99f61..37d176f659 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageGrpcReadChannel.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageGrpcReadChannel.java @@ -105,8 +105,6 @@ public class GoogleCloudStorageGrpcReadChannel implements SeekableByteChannel { // keep the most recently received content and a reference to how much of it we've returned so far @Nullable private ByteString bufferedContent; - private int bufferedContentReadOffset; - // InputStream that backs bufferedContent. This needs to be closed when bufferedContent is no // longer needed. @Nullable private InputStream streamForBufferedContent; @@ -266,23 +264,25 @@ private static void put(ByteString source, int offset, int size, ByteBuffer dest private int readBufferedContentInto(ByteBuffer byteBuffer) { // Handle skipping forward through the buffer for a seek. long bytesToSkip = positionForNextRead - positionInGrpcStream; - long bufferSkip = min(bufferedContent.size() - bufferedContentReadOffset, bytesToSkip); - bufferSkip = max(0, bufferSkip); - bufferedContentReadOffset += bufferSkip; - positionInGrpcStream += bufferSkip; - int remainingBufferedBytes = bufferedContent.size() - bufferedContentReadOffset; - - boolean remainingBufferedContentLargerThanByteBuffer = - remainingBufferedBytes > byteBuffer.remaining(); - int bytesToWrite = - remainingBufferedContentLargerThanByteBuffer - ? byteBuffer.remaining() - : remainingBufferedBytes; - put(bufferedContent, bufferedContentReadOffset, bytesToWrite, byteBuffer); + + if (bytesToSkip >= bufferedContent.size()) { + positionInGrpcStream += bufferedContent.size(); + invalidateBufferedContent(); + return 0; + } + + if (bytesToSkip > 0) { + positionInGrpcStream += bytesToSkip; + bufferedContent = bufferedContent.substring(Math.toIntExact(bytesToSkip)); + } + + int bytesToWrite = Math.min(byteBuffer.remaining(), bufferedContent.size()); + put(bufferedContent, 0, bytesToWrite, byteBuffer); positionInGrpcStream += bytesToWrite; positionForNextRead += bytesToWrite; - if (remainingBufferedContentLargerThanByteBuffer) { - bufferedContentReadOffset += bytesToWrite; + + if (bytesToWrite < bufferedContent.size()) { + bufferedContent = bufferedContent.substring(bytesToWrite); } else { invalidateBufferedContent(); } @@ -447,8 +447,7 @@ private int readObjectContentFromGCS(ByteBuffer byteBuffer) throws IOException { positionForNextRead += bytesToWrite; if (responseSizeLargerThanRemainingBuffer) { invalidateBufferedContent(); - bufferedContent = content; - bufferedContentReadOffset = bytesToWrite; + bufferedContent = content.substring(bytesToWrite); // This is to keep the stream alive for the message backed by this. streamForBufferedContent = stream; stream = null; @@ -712,7 +711,6 @@ public String toString() { private void invalidateBufferedContent() { bufferedContent = null; - bufferedContentReadOffset = 0; if (streamForBufferedContent != null) { try { streamForBufferedContent.close();