diff --git a/cmd/storage/storage.go b/cmd/storage/storage.go index 8efef688b..4da3d9d0b 100644 --- a/cmd/storage/storage.go +++ b/cmd/storage/storage.go @@ -10,11 +10,23 @@ import ( "github.com/golang/protobuf/jsonpb" cmdutil "github.com/ohsu-comp-bio/funnel/cmd/util" "github.com/ohsu-comp-bio/funnel/config" + "github.com/ohsu-comp-bio/funnel/logger" "github.com/ohsu-comp-bio/funnel/storage" "github.com/ohsu-comp-bio/funnel/tes" "github.com/spf13/cobra" ) +var log = logger.NewLogger("storage", logger.DefaultConfig()) + +func newStorage(conf config.Config) (storage.Storage, error) { + store, err := storage.NewMux(conf) + if err != nil { + return nil, err + } + store.AttachLogger(log) + return store, nil +} + // NewCommand returns the "storage" subcommands. func NewCommand() *cobra.Command { @@ -46,7 +58,7 @@ func NewCommand() *cobra.Command { return cmd.Usage() } - store, err := storage.NewMux(conf) + store, err := newStorage(conf) if err != nil { return fmt.Errorf("creating storage clients: %s", err) } @@ -73,7 +85,7 @@ func NewCommand() *cobra.Command { return cmd.Usage() } - store, err := storage.NewMux(conf) + store, err := newStorage(conf) if err != nil { return fmt.Errorf("creating storage clients: %s", err) } @@ -131,7 +143,7 @@ func NewCommand() *cobra.Command { return cmd.Usage() } - store, err := storage.NewMux(conf) + store, err := newStorage(conf) if err != nil { return fmt.Errorf("creating storage clients: %s", err) } diff --git a/cmd/worker/run.go b/cmd/worker/run.go index 67ffb738a..44ec9369d 100644 --- a/cmd/worker/run.go +++ b/cmd/worker/run.go @@ -100,6 +100,7 @@ func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger) (*wo if err != nil { return nil, fmt.Errorf("failed to instantiate Storage backend: %v", err) } + store.AttachLogger(log) w := &worker.DefaultWorker{ Conf: conf.Worker, diff --git a/config/default.go b/config/default.go index de69aa65b..d3514e98e 100644 --- a/config/default.go +++ b/config/default.go @@ -91,7 +91,7 @@ func DefaultConfig() Config { }, }, Swift: SwiftStorage{ - MaxRetries: 3, + MaxRetries: 20, ChunkSizeBytes: int64(500 * units.MB), }, } diff --git a/storage/mux.go b/storage/mux.go index 3f5c4c471..b7485706e 100644 --- a/storage/mux.go +++ b/storage/mux.go @@ -4,8 +4,10 @@ import ( "context" "fmt" "strings" + "time" "github.com/ohsu-comp-bio/funnel/config" + "github.com/ohsu-comp-bio/funnel/logger" ) // operation codes help multiplex storage operations across multiple backends. @@ -167,6 +169,18 @@ func (mux *Mux) UnsupportedOperations(url string) UnsupportedOperations { return unsupported } +// AttachLogger will log information (such as retry warnings) +// to the given logger. +func (mux *Mux) AttachLogger(log *logger.Logger) { + for _, b := range mux.Backends { + if r, ok := b.(*Retrier); ok { + r.Retrier.Notify = func(err error, sleep time.Duration) { + log.Warn("Retrying", "error", err, "sleep", sleep) + } + } + } +} + func (mux *Mux) findBackend(url string, op operation) (Storage, error) { var found = 0 var useBackend Storage diff --git a/storage/swift.go b/storage/swift.go index 460301213..bd6ae8f3e 100644 --- a/storage/swift.go +++ b/storage/swift.go @@ -67,26 +67,34 @@ func NewSwiftRetrier(conf config.SwiftStorage) (*Retrier, error) { Backend: b, Retrier: &util.Retrier{ MaxTries: conf.MaxRetries, - InitialInterval: time.Second * 5, - MaxInterval: time.Minute * 5, + InitialInterval: 500 * time.Millisecond, + MaxInterval: 5 * time.Minute, Multiplier: 2.0, RandomizationFactor: 0.5, MaxElapsedTime: 0, - ShouldRetry: func(err error) bool { - // Retry on errors that swift names specifically. - if err == swift.ObjectCorrupted || err == swift.TimeoutError { - return true - } - // Retry on service unavailable. - if se, ok := err.(*swift.Error); ok { - return se.StatusCode == http.StatusServiceUnavailable - } - return false - }, + ShouldRetry: shouldRetry, }, }, nil } +func shouldRetry(err error) bool { + serr, ok := err.(*swiftError) + if !ok { + return false + } + err = serr.err + + // Retry on errors that swift names specifically. + if err == swift.ObjectCorrupted || err == swift.TimeoutError { + return true + } + // Retry on service unavailable. + if se, ok := err.(*swift.Error); ok { + return se.StatusCode == http.StatusServiceUnavailable + } + return false +} + // Stat returns metadata about the given url, such as checksum. func (sw *Swift) Stat(ctx context.Context, url string) (*Object, error) { u, err := sw.parse(url) @@ -96,7 +104,7 @@ func (sw *Swift) Stat(ctx context.Context, url string) (*Object, error) { info, _, err := sw.conn.Object(u.bucket, u.path) if err != nil { - return nil, fmt.Errorf("getting object info: %s", err) + return nil, &swiftError{"getting object info", url, err} } return &Object{ URL: url, @@ -118,7 +126,7 @@ func (sw *Swift) List(ctx context.Context, url string) ([]*Object, error) { Prefix: u.path, }) if err != nil { - return nil, fmt.Errorf("listing objects by prefix: %s", err) + return nil, &swiftError{"listing objects by prefix", url, err} } var objects []*Object @@ -135,7 +143,7 @@ func (sw *Swift) List(ctx context.Context, url string) ([]*Object, error) { } // Get copies an object from storage to the host path. -func (sw *Swift) Get(ctx context.Context, url, path string) (obj *Object, err error) { +func (sw *Swift) Get(ctx context.Context, url, path string) (*Object, error) { u, err := sw.parse(url) if err != nil { return nil, err @@ -144,41 +152,33 @@ func (sw *Swift) Get(ctx context.Context, url, path string) (obj *Object, err er var checkHash = true var headers swift.Headers - obj, err = sw.Stat(ctx, url) + obj, err := sw.Stat(ctx, url) if err != nil { - return + return nil, err } f, _, err := sw.conn.ObjectOpen(u.bucket, u.path, checkHash, headers) if err != nil { - err = fmt.Errorf("initiating download: %s", err) - return + return nil, &swiftError{"initiating download", url, err} } - defer func() { - cerr := f.Close() - if cerr != nil { - err = fmt.Errorf("closing file %v; %v", err, cerr) - } - }() + defer f.Close() dest, err := os.Create(path) if err != nil { - err = fmt.Errorf("creating file: %s", err) + return nil, &swiftError{"creating file", url, err} } - defer func() { - cerr := dest.Close() - if cerr != nil { - err = fmt.Errorf("%v; %v", err, cerr) - } - }() - - _, err = io.Copy(dest, fsutil.Reader(ctx, f)) - if err != nil { - err = fmt.Errorf("copying file: %s", err) - return + + _, copyErr := io.Copy(dest, fsutil.Reader(ctx, f)) + closeErr := dest.Close() + + if copyErr != nil { + return nil, &swiftError{"copying file", url, closeErr} + } + if closeErr != nil { + return nil, &swiftError{"closing file", url, closeErr} } - return + return obj, nil } // Put copies an object (file) from the host path to storage. @@ -191,7 +191,7 @@ func (sw *Swift) Put(ctx context.Context, url, path string) (*Object, error) { reader, err := os.Open(path) if err != nil { - return nil, fmt.Errorf("opening host file %q: %s", path, err) + return nil, &swiftError{"opening host file", url, err} } defer reader.Close() @@ -216,7 +216,7 @@ func (sw *Swift) Put(ctx context.Context, url, path string) (*Object, error) { } if err != nil { - return nil, fmt.Errorf("creating object: %s", err) + return nil, &swiftError{"creating object", url, err} } _, copyErr := io.Copy(writer, fsutil.Reader(ctx, reader)) @@ -225,10 +225,10 @@ func (sw *Swift) Put(ctx context.Context, url, path string) (*Object, error) { closeErr := writer.Close() if copyErr != nil { - return nil, fmt.Errorf("copying file: %s", copyErr) + return nil, &swiftError{"copying file", url, closeErr} } if closeErr != nil { - return nil, fmt.Errorf("closing file: %s", closeErr) + return nil, &swiftError{"closing file", url, closeErr} } return sw.Stat(ctx, url) @@ -248,7 +248,7 @@ func (sw *Swift) UnsupportedOperations(url string) UnsupportedOperations { } _, _, err = sw.conn.Container(u.bucket) if err != nil { - return AllUnsupported(fmt.Errorf("swift: failed to find bucket: %s. error: %v", u.bucket, err)) + return AllUnsupported(&swiftError{"looking for bucket", url, err}) } return AllSupported() } @@ -274,3 +274,12 @@ func (sw *Swift) parse(rawurl string) (*urlparts, error) { } return url, nil } + +type swiftError struct { + msg, url string + err error +} + +func (s *swiftError) Error() string { + return fmt.Sprintf("swift: %s for URL %q: %s", s.msg, s.url, s.err) +} diff --git a/util/retry.go b/util/retry.go index 6850273b7..86a1bcf78 100644 --- a/util/retry.go +++ b/util/retry.go @@ -16,6 +16,7 @@ type Retrier struct { MaxElapsedTime time.Duration MaxTries int ShouldRetry func(err error) bool + Notify func(err error, d time.Duration) backoff backoff.BackOff } @@ -36,7 +37,13 @@ func NewRetrier() *Retrier { // Retry the function f until it does not return error or BackOff stops. func (r *Retrier) Retry(ctx context.Context, f func() error) error { b := backoff.WithContext(r.withTries(), ctx) - return backoff.Retry(func() error { return r.checkErr(f()) }, b) + return backoff.RetryNotify(func() error { return r.checkErr(f()) }, b, r.notify) +} + +func (r *Retrier) notify(err error, d time.Duration) { + if r.Notify != nil { + r.Notify(err, d) + } } func (r *Retrier) checkErr(err error) error {