Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 91 additions & 51 deletions pkg/api/helpers/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"net/http"
"os"
"path/filepath"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/go-openapi/swag"
"github.com/hashicorp/go-retryablehttp"
"github.com/jedib0t/go-pretty/v6/progress"
Expand All @@ -25,6 +27,22 @@ const (
DefaultDownloadConcurrency = 10
)

// Backoff parameters for download retries
const (
DefaultDownloadInitialInterval = 100 * time.Millisecond
DefaultDownloadMaxInterval = 2 * time.Second
DefaultDownloadMaxElapsedTime = 30 * time.Second
)

// newDownloadBackoff creates a backoff strategy for download retries.
func newDownloadBackoff() backoff.BackOff {
return backoff.NewExponentialBackOff(
backoff.WithInitialInterval(DefaultDownloadInitialInterval),
backoff.WithMaxInterval(DefaultDownloadMaxInterval),
backoff.WithMaxElapsedTime(DefaultDownloadMaxElapsedTime),
)
}

type Downloader struct {
Client *apigen.ClientWithResponses
PreSign bool
Expand Down Expand Up @@ -155,9 +173,7 @@ func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI,
if err != nil {
return err
}
defer func() {
_ = f.Close()
}()
defer func() { _ = f.Close() }()

// make sure the destination file is in the right size
if err := f.Truncate(size); err != nil {
Expand Down Expand Up @@ -218,25 +234,9 @@ func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI,
}

func (d *Downloader) downloadPresignedPart(ctx context.Context, physicalAddress string, rangeStart int64, partSize int64, partNumber int, f *os.File, buf []byte) error {
// set range header
rangeEnd := rangeStart + partSize - 1
rangeHeader := fmt.Sprintf("bytes=%d-%d", rangeStart, rangeEnd)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, physicalAddress, nil)
if err != nil {
return err
}
req.Header.Set("Range", rangeHeader)
resp, err := d.HTTPClient.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusPartialContent {
return fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
}
if resp.ContentLength != partSize {
return fmt.Errorf("%w: part %d expected %d bytes, got %d", ErrRequestFailed, partNumber, partSize, resp.ContentLength)
}

// reuse buffer if possible
if buf == nil {
Expand All @@ -245,48 +245,88 @@ func (d *Downloader) downloadPresignedPart(ctx context.Context, physicalAddress
buf = buf[:partSize]
}

_, err = io.ReadFull(resp.Body, buf)
if err != nil {
return err
operation := func() error {
// create request with range header
req, err := http.NewRequestWithContext(ctx, http.MethodGet, physicalAddress, nil)
if err != nil {
return backoff.Permanent(err)
}
req.Header.Set("Range", rangeHeader)

resp, err := d.HTTPClient.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusPartialContent {
err := fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
return backoff.Permanent(err)
}
if resp.ContentLength != partSize {
err := fmt.Errorf("%w: part %d expected %d bytes, got %d", ErrRequestFailed, partNumber, partSize, resp.ContentLength)
return backoff.Permanent(err)
}
_, readErr := io.ReadFull(resp.Body, buf)
return readErr
}

_, err = f.WriteAt(buf, rangeStart)
if err != nil {
return err
bo := backoff.WithContext(newDownloadBackoff(), ctx)
if err := backoff.Retry(operation, bo); err != nil {
return fmt.Errorf("failed to download part %d: %w", partNumber, err)
}
return nil

_, err := f.WriteAt(buf, rangeStart)
return err
}

func (d *Downloader) downloadObject(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker) error {
// get object content
resp, err := d.Client.GetObject(ctx, src.Repository, src.Ref, &apigen.GetObjectParams{
Path: *src.Path,
Presign: swag.Bool(d.PreSign),
})
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
}
operation := func() error {
// get object content
resp, err := d.Client.GetObject(ctx, src.Repository, src.Ref, &apigen.GetObjectParams{
Path: *src.Path,
Presign: swag.Bool(d.PreSign),
})
if err != nil {
return backoff.Permanent(err)
}
defer func() {
_ = resp.Body.Close()
}()

if tracker != nil && resp.ContentLength != -1 {
tracker.UpdateTotal(resp.ContentLength)
}
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("%w: %s", ErrRequestFailed, resp.Status)
return backoff.Permanent(err)
}

// create and copy object content
f, err := os.Create(dst)
if err != nil {
// create and copy object content
f, err := os.Create(dst)
if err != nil {
return backoff.Permanent(err)
}
defer func() {
_ = f.Close()
}()

// w is used to write the data, it will be wrapped with a tracker if needed
var w io.Writer = f
if tracker != nil {
tracker.Reset()
if resp.ContentLength != -1 {
tracker.UpdateTotal(resp.ContentLength)
}
w = NewTrackerWriter(f, tracker)
}

_, err = io.Copy(w, resp.Body)
return err
}
defer func() { _ = f.Close() }()
var w io.Writer = f
if tracker != nil {
w = NewTrackerWriter(f, tracker)

b := backoff.WithContext(newDownloadBackoff(), ctx)
if err := backoff.Retry(operation, b); err != nil {
return fmt.Errorf("failed to download object: %w", err)
}
_, err = io.Copy(w, resp.Body)
return err
return nil
}

// Tracker interface for tracking written data.
Expand Down
157 changes: 112 additions & 45 deletions pkg/api/helpers/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"net/url"
"path/filepath"
"strings"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/go-openapi/swag"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/apiutil"
Expand All @@ -36,6 +38,22 @@ const DefaultUploadPartSize = MinUploadPartSize
// DefaultUploadConcurrency is the default number of goroutines to spin up when uploading a multipart upload
const DefaultUploadConcurrency = 5

// Backoff parameters for upload retries
const (
DefaultUploadInitialInterval = 100 * time.Millisecond
DefaultUploadMaxInterval = 2 * time.Second
DefaultUploadMaxElapsedTime = 30 * time.Second
)

// newUploadBackoff creates a backoff strategy for upload retries.
func newUploadBackoff() backoff.BackOff {
return backoff.NewExponentialBackOff(
backoff.WithInitialInterval(DefaultUploadInitialInterval),
backoff.WithMaxInterval(DefaultUploadMaxInterval),
backoff.WithMaxElapsedTime(DefaultUploadMaxElapsedTime),
)
}

// ClientUpload uploads contents as a file via lakeFS
func ClientUpload(ctx context.Context, client apigen.ClientWithResponsesInterface, repoID, branchID, objPath string, metadata map[string]string, contentType string, contents io.ReadSeeker) (*apigen.ObjectStats, error) {
pr, pw := io.Pipe()
Expand Down Expand Up @@ -299,33 +317,59 @@ func (u *presignUpload) initMultipart(ctx context.Context) (*apigen.PresignMulti
}

func (u *presignUpload) uploadPart(ctx context.Context, partReader *io.SectionReader, partURL string) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPut, partURL, partReader)
if err != nil {
return "", err
}
req.ContentLength = partReader.Size()
if u.contentType != "" {
req.Header.Set("Content-Type", u.contentType)
}
var etag string

resp, err := u.uploader.HTTPClient.Do(req)
if err != nil {
return "", err
}
defer func() { _ = resp.Body.Close() }()
if !httputil.IsSuccessStatusCode(resp) {
return "", fmt.Errorf("upload %s part(%s): %w", partURL, resp.Status, ErrRequestFailed)
operation := func() error {
// Reset reader position for each attempt
if _, err := partReader.Seek(0, io.SeekStart); err != nil {
return backoff.Permanent(fmt.Errorf("failed to reset reader: %w", err))
}

req, err := http.NewRequestWithContext(ctx, http.MethodPut, partURL, partReader)
if err != nil {
return backoff.Permanent(err)
}
req.ContentLength = partReader.Size()
if u.contentType != "" {
req.Header.Set("Content-Type", u.contentType)
}

resp, err := u.uploader.HTTPClient.Do(req)
if err != nil {
return backoff.Permanent(err)
}
defer func() { _ = resp.Body.Close() }()

if !httputil.IsSuccessStatusCode(resp) {
err := fmt.Errorf("upload %s part(%s): %w", partURL, resp.Status, ErrRequestFailed)
return backoff.Permanent(err)
}

etag = extractEtagFromResponseHeader(resp.Header)
if etag == "" {
err := fmt.Errorf("upload etag is missing %s: %w", partURL, ErrRequestFailed)
return backoff.Permanent(err)
}

return nil
}

etag := extractEtagFromResponseHeader(resp.Header)
if etag == "" {
return "", fmt.Errorf("upload etag is missing %s: %w", partURL, ErrRequestFailed)
b := backoff.WithContext(newUploadBackoff(), ctx)
if err := backoff.Retry(operation, b); err != nil {
return "", fmt.Errorf("failed to upload part: %w", err)
}
return etag, nil
}

func (u *presignUpload) uploadObject(ctx context.Context) (*apigen.ObjectStats, error) {
stagingLocation, err := getPhysicalAddress(ctx, u.uploader.Client, u.repoID, u.branchID, &apigen.GetPhysicalAddressParams{
var (
etag string
stagingLocation *apigen.StagingLocation
)

// get physical address for the object upload (done once before retries)
var err error
stagingLocation, err = getPhysicalAddress(ctx, u.uploader.Client, u.repoID, u.branchID, &apigen.GetPhysicalAddressParams{
Path: u.objectPath,
Presign: swag.Bool(true),
})
Expand All @@ -334,38 +378,61 @@ func (u *presignUpload) uploadObject(ctx context.Context) (*apigen.ObjectStats,
}
preSignURL := swag.StringValue(stagingLocation.PresignedUrl)

var body io.ReadSeeker
if u.size > 0 {
// Passing Reader with content length == 0 results in 501 Not Implemented
body = u.reader
}
operation := func() error {
// Reset reader position for each attempt
if _, err := u.reader.Seek(0, io.SeekStart); err != nil {
return backoff.Permanent(fmt.Errorf("failed to reset reader: %w", err))
}

req, err := http.NewRequestWithContext(ctx, http.MethodPut, preSignURL, body)
if err != nil {
return nil, err
}
req.ContentLength = u.size
if u.contentType != "" {
req.Header.Set("Content-Type", u.contentType)
}
if isAzureBlobURL(req.URL) {
req.Header.Set("x-ms-blob-type", "BlockBlob")
}
var body io.ReadSeeker
if u.size > 0 {
// Passing Reader with content length == 0 results in 501 Not Implemented
body = u.reader
}

putResp, err := u.uploader.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer func() { _ = putResp.Body.Close() }()
if !httputil.IsSuccessStatusCode(putResp) {
return nil, fmt.Errorf("upload %w %s: %s", ErrRequestFailed, preSignURL, putResp.Status)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, preSignURL, body)
if err != nil {
return backoff.Permanent(err)
}
req.ContentLength = u.size
if u.contentType != "" {
req.Header.Set("Content-Type", u.contentType)
}
if isAzureBlobURL(req.URL) {
req.Header.Set("x-ms-blob-type", "BlockBlob")
}

resp, err := u.uploader.HTTPClient.Do(req)
if err != nil {
return backoff.Permanent(err)
}
defer func() { _ = resp.Body.Close() }()

// Read response body to ensure upload completed
_, readErr := io.Copy(io.Discard, resp.Body)
if readErr != nil {
return readErr
}

if !httputil.IsSuccessStatusCode(resp) {
err := fmt.Errorf("upload %w %s: %s", ErrRequestFailed, preSignURL, resp.Status)
return backoff.Permanent(err)
}

etag = extractEtagFromResponseHeader(resp.Header)
if etag == "" {
err := fmt.Errorf("upload %w %s: etag is missing", ErrRequestFailed, preSignURL)
return backoff.Permanent(err)
}
return nil
}

etag := extractEtagFromResponseHeader(putResp.Header)
if etag == "" {
return nil, fmt.Errorf("upload %w %s: etag is missing", ErrRequestFailed, preSignURL)
b := backoff.WithContext(newUploadBackoff(), ctx)
if err := backoff.Retry(operation, b); err != nil {
return nil, fmt.Errorf("failed to upload object: %w", err)
}

// Link the physical address (don't retry this as it's idempotent and the upload succeeded)
linkReqBody := apigen.LinkPhysicalAddressJSONRequestBody{
Checksum: etag,
SizeBytes: u.size,
Expand Down
Loading