diff --git a/storage/integration_test.go b/storage/integration_test.go index 7faa31dbd3ec..16fd736f4865 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -3178,6 +3178,54 @@ func TestIntegration_ReaderAttrs(t *testing.T) { } } +// Test that context cancellation correctly stops a download before completion. +func TestIntegration_ReaderCancel(t *testing.T) { + ctx := context.Background() + client := testConfig(ctx, t) + defer client.Close() + + bkt := client.Bucket(bucketName) + + // Upload a 1MB object. + obj := bkt.Object("reader-cancel-obj") + w := obj.NewWriter(ctx) + c := randomContents() + for i := 0; i < 62500; i++ { + if _, err := w.Write(c); err != nil { + t.Fatalf("writer.Write: %v", err) + } + + } + w.Close() + + // Create a reader (which makes a GET request to GCS and opens the body to + // read the object) and then cancel the context before reading. + readerCtx, cancel := context.WithCancel(ctx) + r, err := obj.NewReader(readerCtx) + if err != nil { + t.Fatalf("obj.NewReader: %v", err) + } + defer r.Close() + + cancel() + + // Read the object 1KB a time. We cannot guarantee that Reads will return a + // context canceled error immediately, but they should always do so before we + // reach EOF. + var readErr error + for i := 0; i < 1000; i++ { + buf := make([]byte, 1000) + _, readErr = r.Read(buf) + if readErr != nil { + if readErr == context.Canceled { + return + } + break + } + } + t.Fatalf("Reader.Read: got %v, want context.Canceled", readErr) +} + // Ensures that a file stored with a: // * Content-Encoding of "gzip" // * Content-Type of "text/plain" diff --git a/storage/reader.go b/storage/reader.go index d64f5ec778c3..94563c2afeea 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -23,7 +23,6 @@ import ( "io/ioutil" "net/http" "net/url" - "reflect" "strconv" "strings" "time" @@ -135,6 +134,11 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) // Define a function that initiates a Read with offset and length, assuming we // have already read seen bytes. reopen := func(seen int64) (*http.Response, error) { + // If the context has already expired, return immediately without making a + // call. + if err := ctx.Err(); err != nil { + return nil, err + } start := offset + seen if length < 0 && start < 0 { req.Header.Set("Range", fmt.Sprintf("bytes=%d", start)) @@ -369,11 +373,12 @@ func (r *Reader) readWithRetry(p []byte) (int, error) { m, err := r.body.Read(p[n:]) n += m r.seen += int64(m) - if !shouldRetryRead(err) { + if err == nil || err == io.EOF { return n, err } - // Read failed, but we will try again. Send a ranged read request that takes - // into account the number of bytes we've already seen. + // Read failed (likely due to connection issues), but we will try to reopen + // the pipe and continue. Send a ranged read request that takes into account + // the number of bytes we've already seen. res, err := r.reopen(r.seen) if err != nil { // reopen already retries @@ -385,13 +390,6 @@ func (r *Reader) readWithRetry(p []byte) (int, error) { return n, nil } -func shouldRetryRead(err error) bool { - if err == nil { - return false - } - return strings.HasSuffix(err.Error(), "INTERNAL_ERROR") && strings.Contains(reflect.TypeOf(err).String(), "http2") -} - // Size returns the size of the object in bytes. // The returned value is always the same and is not affected by // calls to Read or Close. diff --git a/storage/reader_test.go b/storage/reader_test.go index 4b35f3c94c0f..81c9a0c6d804 100644 --- a/storage/reader_test.go +++ b/storage/reader_test.go @@ -131,7 +131,8 @@ func (h http2Error) Error() string { } func TestRangeReaderRetry(t *testing.T) { - retryErr := http2Error("blah blah INTERNAL_ERROR") + internalErr := http2Error("blah blah INTERNAL_ERROR") + goawayErr := http2Error("http2: server sent GOAWAY and closed the connection; LastStreamID=15, ErrCode=NO_ERROR, debug=\"load_shed\"") readBytes := []byte(readData) hc, close := newTestServer(handleRangeRead) defer close() @@ -159,7 +160,7 @@ func TestRangeReaderRetry(t *testing.T) { offset: 0, length: -1, bodies: []fakeReadCloser{ - {data: readBytes, counts: []int{3}, err: retryErr}, + {data: readBytes, counts: []int{3}, err: internalErr}, {data: readBytes[3:], counts: []int{5, 2}, err: io.EOF}, }, want: readData, @@ -168,8 +169,8 @@ func TestRangeReaderRetry(t *testing.T) { offset: 0, length: -1, bodies: []fakeReadCloser{ - {data: readBytes, counts: []int{5}, err: retryErr}, - {data: readBytes[5:], counts: []int{1, 3}, err: retryErr}, + {data: readBytes, counts: []int{5}, err: internalErr}, + {data: readBytes[5:], counts: []int{1, 3}, err: goawayErr}, {data: readBytes[9:], counts: []int{1}, err: io.EOF}, }, want: readData, @@ -178,7 +179,16 @@ func TestRangeReaderRetry(t *testing.T) { offset: 0, length: 5, bodies: []fakeReadCloser{ - {data: readBytes, counts: []int{3}, err: retryErr}, + {data: readBytes, counts: []int{3}, err: internalErr}, + {data: readBytes[3:], counts: []int{2}, err: io.EOF}, + }, + want: readData[:5], + }, + { + offset: 0, + length: 5, + bodies: []fakeReadCloser{ + {data: readBytes, counts: []int{3}, err: goawayErr}, {data: readBytes[3:], counts: []int{2}, err: io.EOF}, }, want: readData[:5], @@ -187,7 +197,7 @@ func TestRangeReaderRetry(t *testing.T) { offset: 1, length: 5, bodies: []fakeReadCloser{ - {data: readBytes, counts: []int{3}, err: retryErr}, + {data: readBytes, counts: []int{3}, err: internalErr}, {data: readBytes[3:], counts: []int{2}, err: io.EOF}, }, want: readData[:5], @@ -196,7 +206,7 @@ func TestRangeReaderRetry(t *testing.T) { offset: 1, length: 3, bodies: []fakeReadCloser{ - {data: readBytes[1:], counts: []int{1}, err: retryErr}, + {data: readBytes[1:], counts: []int{1}, err: internalErr}, {data: readBytes[2:], counts: []int{2}, err: io.EOF}, }, want: readData[1:4], @@ -205,8 +215,8 @@ func TestRangeReaderRetry(t *testing.T) { offset: 4, length: -1, bodies: []fakeReadCloser{ - {data: readBytes[4:], counts: []int{1}, err: retryErr}, - {data: readBytes[5:], counts: []int{4}, err: retryErr}, + {data: readBytes[4:], counts: []int{1}, err: internalErr}, + {data: readBytes[5:], counts: []int{4}, err: internalErr}, {data: readBytes[9:], counts: []int{1}, err: io.EOF}, }, want: readData[4:], @@ -215,7 +225,7 @@ func TestRangeReaderRetry(t *testing.T) { offset: -4, length: -1, bodies: []fakeReadCloser{ - {data: readBytes[6:], counts: []int{1}, err: retryErr}, + {data: readBytes[6:], counts: []int{1}, err: internalErr}, {data: readBytes[7:], counts: []int{3}, err: io.EOF}, }, want: readData[6:],