Skip to content

Commit

Permalink
fix(storage): try to reopen for failed Reads (#4226)
Browse files Browse the repository at this point in the history
Errors from reading the response body in Reader.Read will now always trigger a reopen() call (unless the context has been canceled). Previously, this was limited to only INTERNAL_ERROR from HTTP/2.

Fixes #3040
  • Loading branch information
tritone authored Jun 23, 2021
1 parent 6c7925e commit 564102b
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 21 deletions.
48 changes: 48 additions & 0 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3178,6 +3178,54 @@ func TestIntegration_ReaderAttrs(t *testing.T) {
}
}

// Test that context cancellation correctly stops a download before completion.
func TestIntegration_ReaderCancel(t *testing.T) {
ctx := context.Background()
client := testConfig(ctx, t)
defer client.Close()

bkt := client.Bucket(bucketName)

// Upload a 1MB object.
obj := bkt.Object("reader-cancel-obj")
w := obj.NewWriter(ctx)
c := randomContents()
for i := 0; i < 62500; i++ {
if _, err := w.Write(c); err != nil {
t.Fatalf("writer.Write: %v", err)
}

}
w.Close()

// Create a reader (which makes a GET request to GCS and opens the body to
// read the object) and then cancel the context before reading.
readerCtx, cancel := context.WithCancel(ctx)
r, err := obj.NewReader(readerCtx)
if err != nil {
t.Fatalf("obj.NewReader: %v", err)
}
defer r.Close()

cancel()

// Read the object 1KB a time. We cannot guarantee that Reads will return a
// context canceled error immediately, but they should always do so before we
// reach EOF.
var readErr error
for i := 0; i < 1000; i++ {
buf := make([]byte, 1000)
_, readErr = r.Read(buf)
if readErr != nil {
if readErr == context.Canceled {
return
}
break
}
}
t.Fatalf("Reader.Read: got %v, want context.Canceled", readErr)
}

// Ensures that a file stored with a:
// * Content-Encoding of "gzip"
// * Content-Type of "text/plain"
Expand Down
20 changes: 9 additions & 11 deletions storage/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"io/ioutil"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -135,6 +134,11 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
// Define a function that initiates a Read with offset and length, assuming we
// have already read seen bytes.
reopen := func(seen int64) (*http.Response, error) {
// If the context has already expired, return immediately without making a
// call.
if err := ctx.Err(); err != nil {
return nil, err
}
start := offset + seen
if length < 0 && start < 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d", start))
Expand Down Expand Up @@ -369,11 +373,12 @@ func (r *Reader) readWithRetry(p []byte) (int, error) {
m, err := r.body.Read(p[n:])
n += m
r.seen += int64(m)
if !shouldRetryRead(err) {
if err == nil || err == io.EOF {
return n, err
}
// Read failed, but we will try again. Send a ranged read request that takes
// into account the number of bytes we've already seen.
// Read failed (likely due to connection issues), but we will try to reopen
// the pipe and continue. Send a ranged read request that takes into account
// the number of bytes we've already seen.
res, err := r.reopen(r.seen)
if err != nil {
// reopen already retries
Expand All @@ -385,13 +390,6 @@ func (r *Reader) readWithRetry(p []byte) (int, error) {
return n, nil
}

func shouldRetryRead(err error) bool {
if err == nil {
return false
}
return strings.HasSuffix(err.Error(), "INTERNAL_ERROR") && strings.Contains(reflect.TypeOf(err).String(), "http2")
}

// Size returns the size of the object in bytes.
// The returned value is always the same and is not affected by
// calls to Read or Close.
Expand Down
30 changes: 20 additions & 10 deletions storage/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func (h http2Error) Error() string {
}

func TestRangeReaderRetry(t *testing.T) {
retryErr := http2Error("blah blah INTERNAL_ERROR")
internalErr := http2Error("blah blah INTERNAL_ERROR")
goawayErr := http2Error("http2: server sent GOAWAY and closed the connection; LastStreamID=15, ErrCode=NO_ERROR, debug=\"load_shed\"")
readBytes := []byte(readData)
hc, close := newTestServer(handleRangeRead)
defer close()
Expand Down Expand Up @@ -159,7 +160,7 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 0,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: retryErr},
{data: readBytes, counts: []int{3}, err: internalErr},
{data: readBytes[3:], counts: []int{5, 2}, err: io.EOF},
},
want: readData,
Expand All @@ -168,8 +169,8 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 0,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{5}, err: retryErr},
{data: readBytes[5:], counts: []int{1, 3}, err: retryErr},
{data: readBytes, counts: []int{5}, err: internalErr},
{data: readBytes[5:], counts: []int{1, 3}, err: goawayErr},
{data: readBytes[9:], counts: []int{1}, err: io.EOF},
},
want: readData,
Expand All @@ -178,7 +179,16 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 0,
length: 5,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: retryErr},
{data: readBytes, counts: []int{3}, err: internalErr},
{data: readBytes[3:], counts: []int{2}, err: io.EOF},
},
want: readData[:5],
},
{
offset: 0,
length: 5,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: goawayErr},
{data: readBytes[3:], counts: []int{2}, err: io.EOF},
},
want: readData[:5],
Expand All @@ -187,7 +197,7 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 1,
length: 5,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: retryErr},
{data: readBytes, counts: []int{3}, err: internalErr},
{data: readBytes[3:], counts: []int{2}, err: io.EOF},
},
want: readData[:5],
Expand All @@ -196,7 +206,7 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 1,
length: 3,
bodies: []fakeReadCloser{
{data: readBytes[1:], counts: []int{1}, err: retryErr},
{data: readBytes[1:], counts: []int{1}, err: internalErr},
{data: readBytes[2:], counts: []int{2}, err: io.EOF},
},
want: readData[1:4],
Expand All @@ -205,8 +215,8 @@ func TestRangeReaderRetry(t *testing.T) {
offset: 4,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes[4:], counts: []int{1}, err: retryErr},
{data: readBytes[5:], counts: []int{4}, err: retryErr},
{data: readBytes[4:], counts: []int{1}, err: internalErr},
{data: readBytes[5:], counts: []int{4}, err: internalErr},
{data: readBytes[9:], counts: []int{1}, err: io.EOF},
},
want: readData[4:],
Expand All @@ -215,7 +225,7 @@ func TestRangeReaderRetry(t *testing.T) {
offset: -4,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes[6:], counts: []int{1}, err: retryErr},
{data: readBytes[6:], counts: []int{1}, err: internalErr},
{data: readBytes[7:], counts: []int{3}, err: io.EOF},
},
want: readData[6:],
Expand Down

0 comments on commit 564102b

Please sign in to comment.