Skip to content

Commit

Permalink
Merge pull request #170 from fission/fix-parallel-tasks
Browse files Browse the repository at this point in the history
Parallelize task executions
  • Loading branch information
erwinvaneyk authored Jul 17, 2018
2 parents d7a7167 + 1398d99 commit dca64b0
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
27 changes: 27 additions & 0 deletions pkg/controller/actions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package controller

import (
"sync"
"sync/atomic"
"time"

"github.com/fission/fission-workflows/pkg/fes"
Expand Down Expand Up @@ -71,3 +73,28 @@ func (a *ActionError) Apply() error {
func (a *ActionError) Eval(rule EvalContext) Action {
return a
}

type MultiAction struct {
Actions []Action
}

func (a *MultiAction) Apply() error {
var wg sync.WaitGroup
var multiErr atomic.Value
wg.Add(len(a.Actions))
for _, action := range a.Actions {
go func(action Action) {
err := action.Apply()
if err != nil {
multiErr.Store(err)
}
wg.Done()
}(action)
}
wg.Wait()
err := multiErr.Load()
if err == nil {
return nil
}
return err.(error)
}
13 changes: 4 additions & 9 deletions pkg/controller/invocation/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (sf *RuleSchedule) Eval(cec controller.EvalContext) controller.Action {
}

// Execute the actions as specified in the execution plan
var actions []controller.Action
for _, a := range schedule.Actions {
switch a.Type {
case scheduler.ActionType_ABORT:
Expand All @@ -110,18 +111,18 @@ func (sf *RuleSchedule) Eval(cec controller.EvalContext) controller.Action {
if err != nil {
log.Errorf("Failed to unpack Scheduler action: %v", err)
}
return &ActionInvokeTask{
actions = append(actions, &ActionInvokeTask{
Wf: wf,
Wfi: wfi,
API: sf.FunctionAPI,
Task: invokeAction,
StateStore: sf.StateStore,
}
})
default:
log.Warnf("Unknown Scheduler action: '%v'", a)
}
}
return nil
return &controller.MultiAction{Actions: actions}
}

type RuleCheckIfCompleted struct {
Expand Down Expand Up @@ -149,12 +150,6 @@ func (cc *RuleCheckIfCompleted) Eval(cec controller.EvalContext) controller.Acti
if finished {
var finalOutput *types.TypedValue
if len(wf.Spec.OutputTask) != 0 {
//t, ok := wfi.Status.Tasks[wf.Spec.OutputTask]
//if !ok {
// return &controller.ActionError{
// Err: errors.New("could not find output task status in completed invocation"),
// }
//}
finalOutput = typedvalues.ResolveTaskOutput(wf.Spec.OutputTask, wfi)
}

Expand Down

0 comments on commit dca64b0

Please sign in to comment.