Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 101 additions & 52 deletions pkg/api/helpers/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"fmt"
"io"
"math/rand/v2"
"net/http"
"os"
"path/filepath"
"time"

"github.com/go-openapi/swag"
"github.com/hashicorp/go-retryablehttp"
Expand All @@ -20,9 +22,11 @@ import (
)

const (
MinDownloadPartSize int64 = 1024 * 64 // 64KB
DefaultDownloadPartSize int64 = 1024 * 1024 * 8 // 8MB
DefaultDownloadConcurrency = 10
MinDownloadPartSize int64 = 1024 * 64 // 64KB
DefaultDownloadPartSize int64 = 1024 * 1024 * 8 // 8MB
DefaultDownloadConcurrency = 10
DefaultDownloadBodyRetries = 3 // Number of retries for body read/write operations
DefaultDownloadRetryDelayMs = 1000
)

type Downloader struct {
Expand All @@ -32,6 +36,7 @@ type Downloader struct {
PartSize int64
SkipNonRegularFiles bool
SymlinkSupport bool
BodyRetries int // Number of retries for body read/write operations
}

type downloadPart struct {
Expand Down Expand Up @@ -59,6 +64,7 @@ func NewDownloader(client *apigen.ClientWithResponses, preSign bool) *Downloader
PartSize: DefaultDownloadPartSize,
SkipNonRegularFiles: false,
SymlinkSupport: false,
BodyRetries: DefaultDownloadBodyRetries,
}
}

Expand Down Expand Up @@ -218,25 +224,9 @@ func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI,
}

func (d *Downloader) downloadPresignedPart(ctx context.Context, physicalAddress string, rangeStart int64, partSize int64, partNumber int, f *os.File, buf []byte) error {
// set range header
rangeEnd := rangeStart + partSize - 1
rangeHeader := fmt.Sprintf("bytes=%d-%d", rangeStart, rangeEnd)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, physicalAddress, nil)
if err != nil {
return err
}
req.Header.Set("Range", rangeHeader)
resp, err := d.HTTPClient.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusPartialContent {
return fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
}
if resp.ContentLength != partSize {
return fmt.Errorf("%w: part %d expected %d bytes, got %d", ErrRequestFailed, partNumber, partSize, resp.ContentLength)
}

// reuse buffer if possible
if buf == nil {
Expand All @@ -245,48 +235,107 @@ func (d *Downloader) downloadPresignedPart(ctx context.Context, physicalAddress
buf = buf[:partSize]
}

_, err = io.ReadFull(resp.Body, buf)
var err error
for attempt := 0; attempt <= d.BodyRetries; attempt++ {
if attempt > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think that all errors should be retryable?

  1. HTTP requests are already being retried.
  2. There are some unrelated failures, like OS operations, that we should retry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update the code to consider the http request and any other check (like etag) to be final error not be counted as part of the network error retry.

// sleep for a random time between 0 and 1 second or break on context cancellation
select {
//nolint:gosec
case <-time.After(time.Duration(rand.IntN(DefaultDownloadRetryDelayMs)) * time.Millisecond):
case <-ctx.Done():
return ctx.Err()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some retry package that we use often. Why did you decide on using some custom logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will switch to use backoff.Retry


// create request with range header
var req *http.Request
req, err = http.NewRequestWithContext(ctx, http.MethodGet, physicalAddress, nil)
if err != nil {
return err
}
req.Header.Set("Range", rangeHeader)

err = executeHTTPRequest(d.HTTPClient, req, func(resp *http.Response) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't you passing the retryClient here? It means that it will retry (attempts)^2 before failing, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • the helper is to verify that we do close the body.
  • can't reuse a request so for any error during upload and download we perform a new request

if resp.StatusCode != http.StatusPartialContent {
return fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
}
if resp.ContentLength != partSize {
return fmt.Errorf("%w: part %d expected %d bytes, got %d", ErrRequestFailed, partNumber, partSize, resp.ContentLength)
}
_, readErr := io.ReadFull(resp.Body, buf)
return readErr
})
if err == nil {
break
}
}
if err != nil {
return err
return fmt.Errorf("failed to download part %d after %d retries: %w", partNumber, d.BodyRetries+1, err)
}

_, err = f.WriteAt(buf, rangeStart)
if err != nil {
return err
}
return nil
return err
}

func (d *Downloader) downloadObject(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker) error {
// get object content
resp, err := d.Client.GetObject(ctx, src.Repository, src.Ref, &apigen.GetObjectParams{
Path: *src.Path,
Presign: swag.Bool(d.PreSign),
})
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
}
var err error
for attempt := 0; attempt <= d.BodyRetries; attempt++ {
if attempt > 0 {
// sleep for a random time between 0 and 1 second or break on context cancellation
select {
//nolint:gosec
case <-time.After(time.Duration(rand.IntN(DefaultDownloadRetryDelayMs)) * time.Millisecond):
case <-ctx.Done():
return ctx.Err()
}
// Remove destination file if retrying (will be recreated)
_ = os.Remove(dst)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hard to maintain, since it's so easy to miss this when the code below evolves.
WDYT about truncating the file upon first write instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. kept the same code as we added remove when we added symlink support where truncate is not an option.

}

if tracker != nil && resp.ContentLength != -1 {
tracker.UpdateTotal(resp.ContentLength)
}
// get object content
var resp *http.Response
resp, err = d.Client.GetObject(ctx, src.Repository, src.Ref, &apigen.GetObjectParams{
Path: *src.Path,
Presign: swag.Bool(d.PreSign),
})
if err != nil {
return err
}
defer func() {
_ = resp.Body.Close()
}()

// create and copy object content
f, err := os.Create(dst)
if err != nil {
return err
}
defer func() { _ = f.Close() }()
var w io.Writer = f
if tracker != nil {
w = NewTrackerWriter(f, tracker)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
}

// create and copy object content
var f *os.File
f, err = os.Create(dst)
if err != nil {
return err
}
defer func() {
_ = f.Close()
}()

// w is used to write the data, it will be wrapped with a tracker if needed
var w io.Writer = f
if tracker != nil {
if resp.ContentLength != -1 {
tracker.UpdateTotal(resp.ContentLength)
}
w = NewTrackerWriter(f, tracker)
}

_, err = io.Copy(w, resp.Body)
// if write is successful, return nil
if err == nil {
return nil
}
}
_, err = io.Copy(w, resp.Body)
return err

return fmt.Errorf("failed to download object after %d retries: %w", d.BodyRetries+1, err)
}

// Tracker interface for tracking written data.
Expand Down
14 changes: 14 additions & 0 deletions pkg/api/helpers/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,17 @@ func HTTPResponseAsError(httpResponse *http.Response) error {
},
}
}

// executeHTTPRequest executes an HTTP request and ensures the response body is closed with defer.
// The callback function receives the response and should return an error if the request should be retried.
// The response body will be automatically closed after the callback returns.
func executeHTTPRequest(client *http.Client, req *http.Request, callback func(*http.Response) error) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be in errors.go

resp, err := client.Do(req)
if err != nil {
return err
}
defer func() {
_ = resp.Body.Close()
}()
return callback(resp)
}
Loading
Loading