Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/swift: better retries and bug fixes #552

Merged
merged 1 commit into from
May 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cmd/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,23 @@ import (
"github.com/golang/protobuf/jsonpb"
cmdutil "github.com/ohsu-comp-bio/funnel/cmd/util"
"github.com/ohsu-comp-bio/funnel/config"
"github.com/ohsu-comp-bio/funnel/logger"
"github.com/ohsu-comp-bio/funnel/storage"
"github.com/ohsu-comp-bio/funnel/tes"
"github.com/spf13/cobra"
)

var log = logger.NewLogger("storage", logger.DefaultConfig())

func newStorage(conf config.Config) (storage.Storage, error) {
store, err := storage.NewMux(conf)
if err != nil {
return nil, err
}
store.AttachLogger(log)
return store, nil
}

// NewCommand returns the "storage" subcommands.
func NewCommand() *cobra.Command {

Expand Down Expand Up @@ -46,7 +58,7 @@ func NewCommand() *cobra.Command {
return cmd.Usage()
}

store, err := storage.NewMux(conf)
store, err := newStorage(conf)
if err != nil {
return fmt.Errorf("creating storage clients: %s", err)
}
Expand All @@ -73,7 +85,7 @@ func NewCommand() *cobra.Command {
return cmd.Usage()
}

store, err := storage.NewMux(conf)
store, err := newStorage(conf)
if err != nil {
return fmt.Errorf("creating storage clients: %s", err)
}
Expand Down Expand Up @@ -131,7 +143,7 @@ func NewCommand() *cobra.Command {
return cmd.Usage()
}

store, err := storage.NewMux(conf)
store, err := newStorage(conf)
if err != nil {
return fmt.Errorf("creating storage clients: %s", err)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger) (*wo
if err != nil {
return nil, fmt.Errorf("failed to instantiate Storage backend: %v", err)
}
store.AttachLogger(log)

w := &worker.DefaultWorker{
Conf: conf.Worker,
Expand Down
2 changes: 1 addition & 1 deletion config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func DefaultConfig() Config {
},
},
Swift: SwiftStorage{
MaxRetries: 3,
MaxRetries: 20,
ChunkSizeBytes: int64(500 * units.MB),
},
}
Expand Down
14 changes: 14 additions & 0 deletions storage/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/ohsu-comp-bio/funnel/config"
"github.com/ohsu-comp-bio/funnel/logger"
)

// operation codes help multiplex storage operations across multiple backends.
Expand Down Expand Up @@ -167,6 +169,18 @@ func (mux *Mux) UnsupportedOperations(url string) UnsupportedOperations {
return unsupported
}

// AttachLogger will log information (such as retry warnings)
// to the given logger.
func (mux *Mux) AttachLogger(log *logger.Logger) {
for _, b := range mux.Backends {
if r, ok := b.(*Retrier); ok {
r.Retrier.Notify = func(err error, sleep time.Duration) {
log.Warn("Retrying", "error", err, "sleep", sleep)
}
}
}
}

func (mux *Mux) findBackend(url string, op operation) (Storage, error) {
var found = 0
var useBackend Storage
Expand Down
97 changes: 53 additions & 44 deletions storage/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,34 @@ func NewSwiftRetrier(conf config.SwiftStorage) (*Retrier, error) {
Backend: b,
Retrier: &util.Retrier{
MaxTries: conf.MaxRetries,
InitialInterval: time.Second * 5,
MaxInterval: time.Minute * 5,
InitialInterval: 500 * time.Millisecond,
MaxInterval: 5 * time.Minute,
Multiplier: 2.0,
RandomizationFactor: 0.5,
MaxElapsedTime: 0,
ShouldRetry: func(err error) bool {
// Retry on errors that swift names specifically.
if err == swift.ObjectCorrupted || err == swift.TimeoutError {
return true
}
// Retry on service unavailable.
if se, ok := err.(*swift.Error); ok {
return se.StatusCode == http.StatusServiceUnavailable
}
return false
},
ShouldRetry: shouldRetry,
},
}, nil
}

func shouldRetry(err error) bool {
serr, ok := err.(*swiftError)
if !ok {
return false
}
err = serr.err

// Retry on errors that swift names specifically.
if err == swift.ObjectCorrupted || err == swift.TimeoutError {
return true
}
// Retry on service unavailable.
if se, ok := err.(*swift.Error); ok {
return se.StatusCode == http.StatusServiceUnavailable
}
return false
}

// Stat returns metadata about the given url, such as checksum.
func (sw *Swift) Stat(ctx context.Context, url string) (*Object, error) {
u, err := sw.parse(url)
Expand All @@ -96,7 +104,7 @@ func (sw *Swift) Stat(ctx context.Context, url string) (*Object, error) {

info, _, err := sw.conn.Object(u.bucket, u.path)
if err != nil {
return nil, fmt.Errorf("getting object info: %s", err)
return nil, &swiftError{"getting object info", url, err}
}
return &Object{
URL: url,
Expand All @@ -118,7 +126,7 @@ func (sw *Swift) List(ctx context.Context, url string) ([]*Object, error) {
Prefix: u.path,
})
if err != nil {
return nil, fmt.Errorf("listing objects by prefix: %s", err)
return nil, &swiftError{"listing objects by prefix", url, err}
}

var objects []*Object
Expand All @@ -135,7 +143,7 @@ func (sw *Swift) List(ctx context.Context, url string) ([]*Object, error) {
}

// Get copies an object from storage to the host path.
func (sw *Swift) Get(ctx context.Context, url, path string) (obj *Object, err error) {
func (sw *Swift) Get(ctx context.Context, url, path string) (*Object, error) {
u, err := sw.parse(url)
if err != nil {
return nil, err
Expand All @@ -144,41 +152,33 @@ func (sw *Swift) Get(ctx context.Context, url, path string) (obj *Object, err er
var checkHash = true
var headers swift.Headers

obj, err = sw.Stat(ctx, url)
obj, err := sw.Stat(ctx, url)
if err != nil {
return
return nil, err
}

f, _, err := sw.conn.ObjectOpen(u.bucket, u.path, checkHash, headers)
if err != nil {
err = fmt.Errorf("initiating download: %s", err)
return
return nil, &swiftError{"initiating download", url, err}
}
defer func() {
cerr := f.Close()
if cerr != nil {
err = fmt.Errorf("closing file %v; %v", err, cerr)
}
}()
defer f.Close()

dest, err := os.Create(path)
if err != nil {
err = fmt.Errorf("creating file: %s", err)
return nil, &swiftError{"creating file", url, err}
}
defer func() {
cerr := dest.Close()
if cerr != nil {
err = fmt.Errorf("%v; %v", err, cerr)
}
}()

_, err = io.Copy(dest, fsutil.Reader(ctx, f))
if err != nil {
err = fmt.Errorf("copying file: %s", err)
return

_, copyErr := io.Copy(dest, fsutil.Reader(ctx, f))
closeErr := dest.Close()

if copyErr != nil {
return nil, &swiftError{"copying file", url, closeErr}
}
if closeErr != nil {
return nil, &swiftError{"closing file", url, closeErr}
}

return
return obj, nil
}

// Put copies an object (file) from the host path to storage.
Expand All @@ -191,7 +191,7 @@ func (sw *Swift) Put(ctx context.Context, url, path string) (*Object, error) {

reader, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("opening host file %q: %s", path, err)
return nil, &swiftError{"opening host file", url, err}
}
defer reader.Close()

Expand All @@ -216,7 +216,7 @@ func (sw *Swift) Put(ctx context.Context, url, path string) (*Object, error) {
}

if err != nil {
return nil, fmt.Errorf("creating object: %s", err)
return nil, &swiftError{"creating object", url, err}
}

_, copyErr := io.Copy(writer, fsutil.Reader(ctx, reader))
Expand All @@ -225,10 +225,10 @@ func (sw *Swift) Put(ctx context.Context, url, path string) (*Object, error) {
closeErr := writer.Close()

if copyErr != nil {
return nil, fmt.Errorf("copying file: %s", copyErr)
return nil, &swiftError{"copying file", url, closeErr}
}
if closeErr != nil {
return nil, fmt.Errorf("closing file: %s", closeErr)
return nil, &swiftError{"closing file", url, closeErr}
}

return sw.Stat(ctx, url)
Expand All @@ -248,7 +248,7 @@ func (sw *Swift) UnsupportedOperations(url string) UnsupportedOperations {
}
_, _, err = sw.conn.Container(u.bucket)
if err != nil {
return AllUnsupported(fmt.Errorf("swift: failed to find bucket: %s. error: %v", u.bucket, err))
return AllUnsupported(&swiftError{"looking for bucket", url, err})
}
return AllSupported()
}
Expand All @@ -274,3 +274,12 @@ func (sw *Swift) parse(rawurl string) (*urlparts, error) {
}
return url, nil
}

type swiftError struct {
msg, url string
err error
}

func (s *swiftError) Error() string {
return fmt.Sprintf("swift: %s for URL %q: %s", s.msg, s.url, s.err)
}
9 changes: 8 additions & 1 deletion util/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Retrier struct {
MaxElapsedTime time.Duration
MaxTries int
ShouldRetry func(err error) bool
Notify func(err error, d time.Duration)
backoff backoff.BackOff
}

Expand All @@ -36,7 +37,13 @@ func NewRetrier() *Retrier {
// Retry the function f until it does not return error or BackOff stops.
func (r *Retrier) Retry(ctx context.Context, f func() error) error {
b := backoff.WithContext(r.withTries(), ctx)
return backoff.Retry(func() error { return r.checkErr(f()) }, b)
return backoff.RetryNotify(func() error { return r.checkErr(f()) }, b, r.notify)
}

func (r *Retrier) notify(err error, d time.Duration) {
if r.Notify != nil {
r.Notify(err, d)
}
}

func (r *Retrier) checkErr(err error) error {
Expand Down