Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 97 additions & 51 deletions pkg/api/helpers/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"net/http"
"os"
"path/filepath"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/go-openapi/swag"
"github.com/hashicorp/go-retryablehttp"
"github.com/jedib0t/go-pretty/v6/progress"
Expand All @@ -25,6 +27,22 @@ const (
DefaultDownloadConcurrency = 10
)

// Backoff parameters for download retries
const (
DefaultDownloadInitialInterval = 100 * time.Millisecond
DefaultDownloadMaxInterval = 2 * time.Second
DefaultDownloadMaxElapsedTime = 30 * time.Second
)

// newDownloadBackoff creates a backoff strategy for download retries.
func newDownloadBackoff() backoff.BackOff {
return backoff.NewExponentialBackOff(
backoff.WithInitialInterval(DefaultDownloadInitialInterval),
backoff.WithMaxInterval(DefaultDownloadMaxInterval),
backoff.WithMaxElapsedTime(DefaultDownloadMaxElapsedTime),
)
}

type Downloader struct {
Client *apigen.ClientWithResponses
PreSign bool
Expand Down Expand Up @@ -155,9 +173,7 @@ func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI,
if err != nil {
return err
}
defer func() {
_ = f.Close()
}()
defer func() { _ = f.Close() }()

// make sure the destination file is in the right size
if err := f.Truncate(size); err != nil {
Expand Down Expand Up @@ -218,25 +234,9 @@ func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI,
}

func (d *Downloader) downloadPresignedPart(ctx context.Context, physicalAddress string, rangeStart int64, partSize int64, partNumber int, f *os.File, buf []byte) error {
// set range header
rangeEnd := rangeStart + partSize - 1
rangeHeader := fmt.Sprintf("bytes=%d-%d", rangeStart, rangeEnd)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, physicalAddress, nil)
if err != nil {
return err
}
req.Header.Set("Range", rangeHeader)
resp, err := d.HTTPClient.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusPartialContent {
return fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
}
if resp.ContentLength != partSize {
return fmt.Errorf("%w: part %d expected %d bytes, got %d", ErrRequestFailed, partNumber, partSize, resp.ContentLength)
}

// reuse buffer if possible
if buf == nil {
Expand All @@ -245,48 +245,94 @@ func (d *Downloader) downloadPresignedPart(ctx context.Context, physicalAddress
buf = buf[:partSize]
}

_, err = io.ReadFull(resp.Body, buf)
if err != nil {
return err
operation := func() error {
// create request with range header
req, err := http.NewRequestWithContext(ctx, http.MethodGet, physicalAddress, nil)
if err != nil {
return backoff.Permanent(err)
}
req.Header.Set("Range", rangeHeader)

resp, err := d.HTTPClient.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusPartialContent {
err := fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
return backoff.Permanent(err)
}
if resp.ContentLength != partSize {
err := fmt.Errorf("%w: part %d expected %d bytes, got %d", ErrRequestFailed, partNumber, partSize, resp.ContentLength)
return backoff.Permanent(err)
}
_, readErr := io.ReadFull(resp.Body, buf)
return readErr
}

_, err = f.WriteAt(buf, rangeStart)
if err != nil {
return err
bo := backoff.WithContext(newDownloadBackoff(), ctx)
notification := func(err error, d time.Duration) {
logging.FromContext(ctx).WithError(err).Warnf("Download of '%s' part %d failed, retrying in %s", f.Name(), partNumber, d)
}
return nil
if err := backoff.RetryNotify(operation, bo, notification); err != nil {
return fmt.Errorf("failed to download '%s' part %d: %w", f.Name(), partNumber, err)
}

_, err := f.WriteAt(buf, rangeStart)
return err
}

func (d *Downloader) downloadObject(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker) error {
// get object content
resp, err := d.Client.GetObject(ctx, src.Repository, src.Ref, &apigen.GetObjectParams{
Path: *src.Path,
Presign: swag.Bool(d.PreSign),
})
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
}
operation := func() error {
// get object content
resp, err := d.Client.GetObject(ctx, src.Repository, src.Ref, &apigen.GetObjectParams{
Path: *src.Path,
Presign: swag.Bool(d.PreSign),
})
if err != nil {
return backoff.Permanent(err)
}
defer func() {
_ = resp.Body.Close()
}()

if tracker != nil && resp.ContentLength != -1 {
tracker.UpdateTotal(resp.ContentLength)
}
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
return backoff.Permanent(err)
}

// create and copy object content
f, err := os.Create(dst)
if err != nil {
// create and copy object content
f, err := os.Create(dst)
if err != nil {
return backoff.Permanent(err)
}
defer func() {
_ = f.Close()
}()

// w is used to write the data, it will be wrapped with a tracker if needed
var w io.Writer = f
if tracker != nil {
tracker.Reset()
if resp.ContentLength != -1 {
tracker.UpdateTotal(resp.ContentLength)
}
w = NewTrackerWriter(f, tracker)
}

_, err = io.Copy(w, resp.Body)
return err
}
defer func() { _ = f.Close() }()
var w io.Writer = f
if tracker != nil {
w = NewTrackerWriter(f, tracker)

b := backoff.WithContext(newDownloadBackoff(), ctx)
notification := func(err error, d time.Duration) {
logging.FromContext(ctx).WithError(err).Warnf("Download of object '%s' failed, retrying in %s", dst, d)
}
_, err = io.Copy(w, resp.Body)
return err
if err := backoff.RetryNotify(operation, b, notification); err != nil {
return fmt.Errorf("failed to download '%s' object: %w", dst, err)
}
return nil
}

// Tracker interface for tracking written data.
Expand Down
Loading