From 1398d993d512edebb662042d9540d40847500451 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Fri, 13 Jul 2018 13:47:14 +0200 Subject: [PATCH] Parallize task executions --- pkg/controller/actions.go | 27 +++++++++++++++++++++++++++ pkg/controller/invocation/rules.go | 13 ++++--------- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/pkg/controller/actions.go b/pkg/controller/actions.go index 9306cc0f..a9598e77 100644 --- a/pkg/controller/actions.go +++ b/pkg/controller/actions.go @@ -1,6 +1,8 @@ package controller import ( + "sync" + "sync/atomic" "time" "github.com/fission/fission-workflows/pkg/fes" @@ -71,3 +73,28 @@ func (a *ActionError) Apply() error { func (a *ActionError) Eval(rule EvalContext) Action { return a } + +type MultiAction struct { + Actions []Action +} + +func (a *MultiAction) Apply() error { + var wg sync.WaitGroup + var multiErr atomic.Value + wg.Add(len(a.Actions)) + for _, action := range a.Actions { + go func(action Action) { + err := action.Apply() + if err != nil { + multiErr.Store(err) + } + wg.Done() + }(action) + } + wg.Wait() + err := multiErr.Load() + if err == nil { + return nil + } + return err.(error) +} diff --git a/pkg/controller/invocation/rules.go b/pkg/controller/invocation/rules.go index 1c69c917..c5e8a994 100644 --- a/pkg/controller/invocation/rules.go +++ b/pkg/controller/invocation/rules.go @@ -91,6 +91,7 @@ func (sf *RuleSchedule) Eval(cec controller.EvalContext) controller.Action { } // Execute the actions as specified in the execution plan + var actions []controller.Action for _, a := range schedule.Actions { switch a.Type { case scheduler.ActionType_ABORT: @@ -110,18 +111,18 @@ func (sf *RuleSchedule) Eval(cec controller.EvalContext) controller.Action { if err != nil { log.Errorf("Failed to unpack Scheduler action: %v", err) } - return &ActionInvokeTask{ + actions = append(actions, &ActionInvokeTask{ Wf: wf, Wfi: wfi, API: sf.FunctionAPI, Task: invokeAction, StateStore: sf.StateStore, - } + }) default: log.Warnf("Unknown Scheduler action: '%v'", a) } } - return nil + return &controller.MultiAction{Actions: actions} } type RuleCheckIfCompleted struct { @@ -149,12 +150,6 @@ func (cc *RuleCheckIfCompleted) Eval(cec controller.EvalContext) controller.Acti if finished { var finalOutput *types.TypedValue if len(wf.Spec.OutputTask) != 0 { - //t, ok := wfi.Status.Tasks[wf.Spec.OutputTask] - //if !ok { - // return &controller.ActionError{ - // Err: errors.New("could not find output task status in completed invocation"), - // } - //} finalOutput = typedvalues.ResolveTaskOutput(wf.Spec.OutputTask, wfi) }