Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: parallelize downloads and uploads #501

Merged
merged 2 commits into from
Mar 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion util/errs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package util

import (
"errors"
"strings"
)

Expand All @@ -11,7 +12,28 @@ type MultiError []error
func (m MultiError) Error() string {
var strs []string
for _, e := range m {
strs = append(strs, e.Error())
if e != nil {
strs = append(strs, e.Error())
}
}
return strings.Join(strs, "\n")
}

// IsNil returns true if all errors in the slice are nil.
func (m MultiError) IsNil() bool {
isNil := true
for _, e := range m {
if e != nil {
isNil = false
}
}
return isNil
}

// ToError returns an error interface.
func (m MultiError) ToError() error {
if m.IsNil() {
return nil
}
return errors.New(m.Error())
}
92 changes: 62 additions & 30 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"time"

"github.com/ohsu-comp-bio/funnel/config"
"github.com/ohsu-comp-bio/funnel/events"
"github.com/ohsu-comp-bio/funnel/proto/tes"
"github.com/ohsu-comp-bio/funnel/storage"
"github.com/ohsu-comp-bio/funnel/util"
"github.com/ohsu-comp-bio/funnel/version"
)

Expand Down Expand Up @@ -112,22 +114,36 @@ func (r *DefaultWorker) Run(pctx context.Context, taskID string) (runerr error)
}

// Download inputs
for _, input := range mapper.Inputs {
if run.ok() {
event.Info("Download started", "url", input.Url)
err := r.Store.Get(ctx, input.Url, input.Path, input.Type)
if err != nil {
if err == storage.ErrEmptyDirectory {
event.Warn("Download finished with warning", "url", input.Url, "warning", err)
downloadErrs := make(util.MultiError, len(mapper.Inputs))
downloadCtx, cancelDownloadCtx := context.WithCancel(ctx)
defer cancelDownloadCtx()
wg := &sync.WaitGroup{}
if run.ok() {
for i, input := range mapper.Inputs {
wg.Add(1)
go func(input *tes.Input, i int) {
defer wg.Done()
event.Info("Download started", "url", input.Url)
err := r.Store.Get(downloadCtx, input.Url, input.Path, input.Type)
if err != nil {
if err == storage.ErrEmptyDirectory {
event.Warn("Download finished with warning", "url", input.Url, "warning", err)
} else {
downloadErrs[i] = err
event.Error("Download failed", "url", input.Url, "error", err)
cancelDownloadCtx()
}
} else {
run.syserr = err
event.Error("Download failed", "url", input.Url, "error", err)
event.Info("Download finished", "url", input.Url)
}
} else {
event.Info("Download finished", "url", input.Url)
}
return
}(input, i)
}
}
wg.Wait()
if !downloadErrs.IsNil() {
run.syserr = downloadErrs.ToError()
}

if run.ok() {
event.State(tes.State_RUNNING)
Expand Down Expand Up @@ -164,32 +180,48 @@ func (r *DefaultWorker) Run(pctx context.Context, taskID string) (runerr error)
}

// Upload outputs
var outputs []*tes.OutputFileLog
for _, output := range mapper.Outputs {
if run.ok() {
event.Info("Upload started", "url", output.Url)
r.fixLinks(mapper, output.Path)
out, err := r.Store.Put(ctx, output.Url, output.Path, output.Type)
if err != nil {
if err == storage.ErrEmptyDirectory {
event.Warn("Upload finished with warning", "url", output.Url, "warning", err)
uploadErrs := make(util.MultiError, len(mapper.Outputs))
outputs := make([][]*tes.OutputFileLog, len(mapper.Outputs))
wg = &sync.WaitGroup{}
if run.ok() {
for i, output := range mapper.Outputs {
wg.Add(1)
go func(output *tes.Output, i int) {
defer wg.Done()
event.Info("Upload started", "url", output.Url)
r.fixLinks(mapper, output.Path)
out, err := r.Store.Put(ctx, output.Url, output.Path, output.Type)
if err != nil {
if err == storage.ErrEmptyDirectory {
event.Warn("Upload finished with warning", "url", output.Url, "warning", err)
} else {
uploadErrs[i] = err
event.Error("Upload failed", "url", output.Url, "error", err)
}
} else {
run.syserr = err
event.Error("Upload failed", "url", output.Url, "error", err)
event.Info("Upload finished", "url", output.Url)
}
} else {
event.Info("Upload finished", "url", output.Url)
}
outputs = append(outputs, out...)
outputs[i] = out
return
}(output, i)
}
}
wg.Wait()
outputLog := []*tes.OutputFileLog{}
for _, out := range outputs {
outputLog = append(outputLog, out...)
}
if !uploadErrs.IsNil() {
run.syserr = uploadErrs.ToError()
}

// unmap paths for OutputFileLog
for _, o := range outputs {
for _, o := range outputLog {
o.Path = mapper.ContainerPath(o.Path)
}

if run.ok() {
event.Outputs(outputs)
if len(outputLog) > 0 {
event.Outputs(outputLog)
}

return
Expand Down