From 2ff243c8f895972da9fc5d41d8736e6d494b9198 Mon Sep 17 00:00:00 2001 From: adamstruck Date: Tue, 6 Mar 2018 11:50:49 -0800 Subject: [PATCH 1/2] worker: parallelize downloads and uploads --- util/errs.go | 24 ++++++++++++- worker/worker.go | 92 ++++++++++++++++++++++++++++++++---------------- 2 files changed, 85 insertions(+), 31 deletions(-) diff --git a/util/errs.go b/util/errs.go index 3fecd100f..6fc21febf 100644 --- a/util/errs.go +++ b/util/errs.go @@ -1,6 +1,7 @@ package util import ( + "errors" "strings" ) @@ -11,7 +12,28 @@ type MultiError []error func (m MultiError) Error() string { var strs []string for _, e := range m { - strs = append(strs, e.Error()) + if e != nil { + strs = append(strs, e.Error()) + } } return strings.Join(strs, "\n") } + +// IsNil returns true if all errors in the slice are nil. +func (m MultiError) IsNil() bool { + isNil := true + for _, e := range m { + if e != nil { + isNil = false + } + } + return isNil +} + +// ToError returns an error interface. +func (m MultiError) ToError() error { + if m.IsNil() { + return nil + } + return errors.New(m.Error()) +} diff --git a/worker/worker.go b/worker/worker.go index 6b5502c5d..c0c667397 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,12 +5,14 @@ import ( "fmt" "os" "path/filepath" + "sync" "time" "github.com/ohsu-comp-bio/funnel/config" "github.com/ohsu-comp-bio/funnel/events" "github.com/ohsu-comp-bio/funnel/proto/tes" "github.com/ohsu-comp-bio/funnel/storage" + "github.com/ohsu-comp-bio/funnel/util" "github.com/ohsu-comp-bio/funnel/version" ) @@ -112,22 +114,36 @@ func (r *DefaultWorker) Run(pctx context.Context, taskID string) (runerr error) } // Download inputs - for _, input := range mapper.Inputs { - if run.ok() { - event.Info("Download started", "url", input.Url) - err := r.Store.Get(ctx, input.Url, input.Path, input.Type) - if err != nil { - if err == storage.ErrEmptyDirectory { - event.Warn("Download finished with warning", "url", input.Url, "warning", err) + downloadErrs := make(util.MultiError, len(mapper.Inputs)) + downloadCtx, cancelDownloadCtx := context.WithCancel(ctx) + defer cancelDownloadCtx() + wg := &sync.WaitGroup{} + wg.Add(len(mapper.Inputs)) + if run.ok() { + for i, input := range mapper.Inputs { + go func(input *tes.Input, i int) { + defer wg.Done() + event.Info("Download started", "url", input.Url) + err := r.Store.Get(downloadCtx, input.Url, input.Path, input.Type) + if err != nil { + if err == storage.ErrEmptyDirectory { + event.Warn("Download finished with warning", "url", input.Url, "warning", err) + } else { + downloadErrs[i] = err + event.Error("Download failed", "url", input.Url, "error", err) + cancelDownloadCtx() + } } else { - run.syserr = err - event.Error("Download failed", "url", input.Url, "error", err) + event.Info("Download finished", "url", input.Url) } - } else { - event.Info("Download finished", "url", input.Url) - } + return + }(input, i) } } + wg.Wait() + if !downloadErrs.IsNil() { + run.syserr = fmt.Errorf("%s", downloadErrs.Error()) + } if run.ok() { event.State(tes.State_RUNNING) @@ -164,32 +180,48 @@ func (r *DefaultWorker) Run(pctx context.Context, taskID string) (runerr error) } // Upload outputs - var outputs []*tes.OutputFileLog - for _, output := range mapper.Outputs { - if run.ok() { - event.Info("Upload started", "url", output.Url) - r.fixLinks(mapper, output.Path) - out, err := r.Store.Put(ctx, output.Url, output.Path, output.Type) - if err != nil { - if err == storage.ErrEmptyDirectory { - event.Warn("Upload finished with warning", "url", output.Url, "warning", err) + uploadErrs := make(util.MultiError, len(mapper.Outputs)) + outputs := make([][]*tes.OutputFileLog, len(mapper.Outputs)) + wg = &sync.WaitGroup{} + wg.Add(len(mapper.Outputs)) + if run.ok() { + for i, output := range mapper.Outputs { + go func(output *tes.Output, i int) { + defer wg.Done() + event.Info("Upload started", "url", output.Url) + r.fixLinks(mapper, output.Path) + out, err := r.Store.Put(ctx, output.Url, output.Path, output.Type) + if err != nil { + if err == storage.ErrEmptyDirectory { + event.Warn("Upload finished with warning", "url", output.Url, "warning", err) + } else { + uploadErrs[i] = err + event.Error("Upload failed", "url", output.Url, "error", err) + } } else { - run.syserr = err - event.Error("Upload failed", "url", output.Url, "error", err) + event.Info("Upload finished", "url", output.Url) } - } else { - event.Info("Upload finished", "url", output.Url) - } - outputs = append(outputs, out...) + outputs[i] = out + return + }(output, i) } } + wg.Wait() + outputLog := []*tes.OutputFileLog{} + for _, out := range outputs { + outputLog = append(outputLog, out...) + } + if !uploadErrs.IsNil() { + run.syserr = fmt.Errorf("%s", uploadErrs.Error()) + } + // unmap paths for OutputFileLog - for _, o := range outputs { + for _, o := range outputLog { o.Path = mapper.ContainerPath(o.Path) } - if run.ok() { - event.Outputs(outputs) + if len(outputLog) > 0 { + event.Outputs(outputLog) } return From bc76060bd64bbdaa1d6b2dc56aa55c9d81dd3cbb Mon Sep 17 00:00:00 2001 From: adamstruck Date: Tue, 6 Mar 2018 13:24:35 -0800 Subject: [PATCH 2/2] fix wait group issue --- worker/worker.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index c0c667397..6970949d2 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -118,9 +118,9 @@ func (r *DefaultWorker) Run(pctx context.Context, taskID string) (runerr error) downloadCtx, cancelDownloadCtx := context.WithCancel(ctx) defer cancelDownloadCtx() wg := &sync.WaitGroup{} - wg.Add(len(mapper.Inputs)) if run.ok() { for i, input := range mapper.Inputs { + wg.Add(1) go func(input *tes.Input, i int) { defer wg.Done() event.Info("Download started", "url", input.Url) @@ -142,7 +142,7 @@ func (r *DefaultWorker) Run(pctx context.Context, taskID string) (runerr error) } wg.Wait() if !downloadErrs.IsNil() { - run.syserr = fmt.Errorf("%s", downloadErrs.Error()) + run.syserr = downloadErrs.ToError() } if run.ok() { @@ -183,9 +183,9 @@ func (r *DefaultWorker) Run(pctx context.Context, taskID string) (runerr error) uploadErrs := make(util.MultiError, len(mapper.Outputs)) outputs := make([][]*tes.OutputFileLog, len(mapper.Outputs)) wg = &sync.WaitGroup{} - wg.Add(len(mapper.Outputs)) if run.ok() { for i, output := range mapper.Outputs { + wg.Add(1) go func(output *tes.Output, i int) { defer wg.Done() event.Info("Upload started", "url", output.Url) @@ -212,7 +212,7 @@ func (r *DefaultWorker) Run(pctx context.Context, taskID string) (runerr error) outputLog = append(outputLog, out...) } if !uploadErrs.IsNil() { - run.syserr = fmt.Errorf("%s", uploadErrs.Error()) + run.syserr = uploadErrs.ToError() } // unmap paths for OutputFileLog