diff --git a/pkg/api/helpers/download.go b/pkg/api/helpers/download.go index 3265a1c90fb..9564ecd8bbf 100644 --- a/pkg/api/helpers/download.go +++ b/pkg/api/helpers/download.go @@ -7,7 +7,9 @@ import ( "net/http" "os" "path/filepath" + "time" + "github.com/cenkalti/backoff/v4" "github.com/go-openapi/swag" "github.com/hashicorp/go-retryablehttp" "github.com/jedib0t/go-pretty/v6/progress" @@ -25,6 +27,22 @@ const ( DefaultDownloadConcurrency = 10 ) +// Backoff parameters for download retries +const ( + DefaultDownloadInitialInterval = 100 * time.Millisecond + DefaultDownloadMaxInterval = 2 * time.Second + DefaultDownloadMaxElapsedTime = 30 * time.Second +) + +// newDownloadBackoff creates a backoff strategy for download retries. +func newDownloadBackoff() backoff.BackOff { + return backoff.NewExponentialBackOff( + backoff.WithInitialInterval(DefaultDownloadInitialInterval), + backoff.WithMaxInterval(DefaultDownloadMaxInterval), + backoff.WithMaxElapsedTime(DefaultDownloadMaxElapsedTime), + ) +} + type Downloader struct { Client *apigen.ClientWithResponses PreSign bool @@ -155,9 +173,7 @@ func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI, if err != nil { return err } - defer func() { - _ = f.Close() - }() + defer func() { _ = f.Close() }() // make sure the destination file is in the right size if err := f.Truncate(size); err != nil { @@ -218,25 +234,9 @@ func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI, } func (d *Downloader) downloadPresignedPart(ctx context.Context, physicalAddress string, rangeStart int64, partSize int64, partNumber int, f *os.File, buf []byte) error { + // set range header rangeEnd := rangeStart + partSize - 1 rangeHeader := fmt.Sprintf("bytes=%d-%d", rangeStart, rangeEnd) - req, err := http.NewRequestWithContext(ctx, http.MethodGet, physicalAddress, nil) - if err != nil { - return err - } - req.Header.Set("Range", rangeHeader) - resp, err := d.HTTPClient.Do(req) - if err != nil { - return err - } - defer func() { _ = resp.Body.Close() }() - - if resp.StatusCode != http.StatusPartialContent { - return fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status) - } - if resp.ContentLength != partSize { - return fmt.Errorf("%w: part %d expected %d bytes, got %d", ErrRequestFailed, partNumber, partSize, resp.ContentLength) - } // reuse buffer if possible if buf == nil { @@ -245,48 +245,94 @@ func (d *Downloader) downloadPresignedPart(ctx context.Context, physicalAddress buf = buf[:partSize] } - _, err = io.ReadFull(resp.Body, buf) - if err != nil { - return err + operation := func() error { + // create request with range header + req, err := http.NewRequestWithContext(ctx, http.MethodGet, physicalAddress, nil) + if err != nil { + return backoff.Permanent(err) + } + req.Header.Set("Range", rangeHeader) + + resp, err := d.HTTPClient.Do(req) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusPartialContent { + err := fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status) + return backoff.Permanent(err) + } + if resp.ContentLength != partSize { + err := fmt.Errorf("%w: part %d expected %d bytes, got %d", ErrRequestFailed, partNumber, partSize, resp.ContentLength) + return backoff.Permanent(err) + } + _, readErr := io.ReadFull(resp.Body, buf) + return readErr } - _, err = f.WriteAt(buf, rangeStart) - if err != nil { - return err + bo := backoff.WithContext(newDownloadBackoff(), ctx) + notification := func(err error, d time.Duration) { + logging.FromContext(ctx).WithError(err).Warnf("Download of '%s' part %d failed, retrying in %s", f.Name(), partNumber, d) } - return nil + if err := backoff.RetryNotify(operation, bo, notification); err != nil { + return fmt.Errorf("failed to download '%s' part %d: %w", f.Name(), partNumber, err) + } + + _, err := f.WriteAt(buf, rangeStart) + return err } func (d *Downloader) downloadObject(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker) error { - // get object content - resp, err := d.Client.GetObject(ctx, src.Repository, src.Ref, &apigen.GetObjectParams{ - Path: *src.Path, - Presign: swag.Bool(d.PreSign), - }) - if err != nil { - return err - } - defer func() { _ = resp.Body.Close() }() - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status) - } + operation := func() error { + // get object content + resp, err := d.Client.GetObject(ctx, src.Repository, src.Ref, &apigen.GetObjectParams{ + Path: *src.Path, + Presign: swag.Bool(d.PreSign), + }) + if err != nil { + return backoff.Permanent(err) + } + defer func() { + _ = resp.Body.Close() + }() - if tracker != nil && resp.ContentLength != -1 { - tracker.UpdateTotal(resp.ContentLength) - } + if resp.StatusCode != http.StatusOK { + err := fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status) + return backoff.Permanent(err) + } - // create and copy object content - f, err := os.Create(dst) - if err != nil { + // create and copy object content + f, err := os.Create(dst) + if err != nil { + return backoff.Permanent(err) + } + defer func() { + _ = f.Close() + }() + + // w is used to write the data, it will be wrapped with a tracker if needed + var w io.Writer = f + if tracker != nil { + tracker.Reset() + if resp.ContentLength != -1 { + tracker.UpdateTotal(resp.ContentLength) + } + w = NewTrackerWriter(f, tracker) + } + + _, err = io.Copy(w, resp.Body) return err } - defer func() { _ = f.Close() }() - var w io.Writer = f - if tracker != nil { - w = NewTrackerWriter(f, tracker) + + b := backoff.WithContext(newDownloadBackoff(), ctx) + notification := func(err error, d time.Duration) { + logging.FromContext(ctx).WithError(err).Warnf("Download of object '%s' failed, retrying in %s", dst, d) } - _, err = io.Copy(w, resp.Body) - return err + if err := backoff.RetryNotify(operation, b, notification); err != nil { + return fmt.Errorf("failed to download '%s' object: %w", dst, err) + } + return nil } // Tracker interface for tracking written data.