From 914698a462fae6964d2bed23bfb1837117627a0a Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Wed, 6 Jun 2018 10:32:02 +0200 Subject: [PATCH 1/8] Implemented Heap for EvalStates --- pkg/controller/actions.go | 6 +- pkg/controller/controller.go | 2 +- pkg/controller/evaluation.go | 443 +++++++++++++++++++++--- pkg/controller/evaluation_test.go | 24 +- pkg/controller/invocation/actions.go | 4 +- pkg/controller/invocation/controller.go | 136 ++++---- pkg/controller/invocation/rules.go | 1 - pkg/controller/rules.go | 2 +- pkg/controller/workflow/controller.go | 45 +-- pkg/controller/workflow/rules.go | 2 +- pkg/util/gopool/gopool.go | 39 +++ 11 files changed, 561 insertions(+), 143 deletions(-) create mode 100644 pkg/util/gopool/gopool.go diff --git a/pkg/controller/actions.go b/pkg/controller/actions.go index a9598e77..1e763cc4 100644 --- a/pkg/controller/actions.go +++ b/pkg/controller/actions.go @@ -8,7 +8,7 @@ import ( "github.com/fission/fission-workflows/pkg/fes" ) -// TODO remove from EvalCache actions +// TODO remove from EvalStore actions type ActionWait struct { EvalState *EvalState @@ -35,12 +35,12 @@ func (a *ActionSkip) Eval(rule EvalContext) Action { } type ActionRemoveFromEvalCache struct { - EvalCache *EvalCache + EvalCache EvalStore ID string } func (a *ActionRemoveFromEvalCache) Apply() error { - a.EvalCache.Del(a.ID) + a.EvalCache.Delete(a.ID) return nil } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index a698d556..e44c8f79 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -45,7 +45,7 @@ var ( Namespace: "workflows", Subsystem: "controller_workflow", Name: "eval_queue_size", - Help: "A gauge of the evaluation queue size", + Help: "A gauge of the evaluation queue cachedLen", }, []string{"controller"}) ) diff --git a/pkg/controller/evaluation.go b/pkg/controller/evaluation.go index 78893ac7..9577dc71 100644 --- a/pkg/controller/evaluation.go +++ b/pkg/controller/evaluation.go @@ -1,8 +1,11 @@ package controller import ( + "container/heap" + "fmt" "strings" "sync" + "sync/atomic" "time" "github.com/opentracing/opentracing-go" @@ -11,65 +14,67 @@ import ( "github.com/sirupsen/logrus" ) -// EvalCache allows storing and retrieving EvalStates in a thread-safe way. -type EvalCache struct { - states map[string]*EvalState - lock sync.RWMutex +// EvalStore allows storing and retrieving EvalStates in a thread-safe way. +type EvalStore struct { + states sync.Map + cachedLen *int32 } -func NewEvalCache() *EvalCache { - return &EvalCache{ - states: map[string]*EvalState{}, +func (e *EvalStore) Len() int { + if e.cachedLen == nil { + var count int32 + e.states.Range(func(k, v interface{}) bool { + count++ + return true + }) + atomic.StoreInt32(e.cachedLen, count) + return int(count) } + return int(atomic.LoadInt32(e.cachedLen)) } -func (e *EvalCache) GetOrCreate(id string, spanCtx opentracing.SpanContext) *EvalState { - s, ok := e.Get(id) - if !ok { - s = NewEvalState(id, spanCtx) - e.Put(s) +func (e *EvalStore) LoadOrStore(id string, spanCtx opentracing.SpanContext) *EvalState { + s, loaded := e.states.LoadOrStore(id, NewEvalState(id, spanCtx)) + if !loaded && e.cachedLen != nil { + atomic.AddInt32(e.cachedLen, 1) } - return s + return s.(*EvalState) } -func (e *EvalCache) Get(id string) (*EvalState, bool) { - e.lock.RLock() - s, ok := e.states[id] - e.lock.RUnlock() - return s, ok +func (e *EvalStore) Load(id string) (*EvalState, bool) { + s, ok := e.states.Load(id) + if !ok { + return nil, false + } + return s.(*EvalState), true } -func (e *EvalCache) Put(state *EvalState) { - e.lock.Lock() - e.states[state.id] = state - e.lock.Unlock() +func (e *EvalStore) Store(state *EvalState) { + e.states.Store(state.id, state) + e.cachedLen = nil // We are not sure if an entry was replaced or added } -func (e *EvalCache) Del(id string) { - e.lock.Lock() - delete(e.states, id) - e.lock.Unlock() +func (e *EvalStore) Delete(id string) { + e.states.Delete(id) + e.cachedLen = nil // We are not sure if an entry was removed } -func (e *EvalCache) List() map[string]*EvalState { +func (e *EvalStore) List() map[string]*EvalState { results := map[string]*EvalState{} - e.lock.RLock() - for id, state := range e.states { - results[id] = state - } - e.lock.RUnlock() + e.states.Range(func(k, v interface{}) bool { + results[k.(string)] = v.(*EvalState) + return true + }) return results } -func (e *EvalCache) Close() error { - e.lock.RLock() - for _, es := range e.states { +func (e *EvalStore) Close() error { + for _, es := range e.List() { err := es.Close() if err != nil { logrus.Errorf("Failed to close evaluation state: %v", err) } } - e.lock.RUnlock() return nil } @@ -162,10 +167,13 @@ func (e *EvalState) Free() { } func (e *EvalState) ID() string { + if e == nil { + return "" + } return e.id } -func (e *EvalState) Count() int { +func (e *EvalState) Len() int { e.dataLock.RLock() defer e.dataLock.RUnlock() return len(e.log) @@ -234,19 +242,19 @@ func NewEvalRecord() EvalRecord { // EvalLog is a time-ordered log of evaluation records. Newer records are appended to the end of the log. type EvalLog []EvalRecord -func (e EvalLog) Count() int { +func (e EvalLog) Len() int { return len(e) } func (e EvalLog) Last() (EvalRecord, bool) { - if e.Count() == 0 { + if e.Len() == 0 { return EvalRecord{}, false } return e[len(e)-1], true } func (e EvalLog) First() (EvalRecord, bool) { - if e.Count() == 0 { + if e.Len() == 0 { return EvalRecord{}, false } return e[0], true @@ -255,3 +263,358 @@ func (e EvalLog) First() (EvalRecord, bool) { func (e *EvalLog) Record(record EvalRecord) { *e = append(*e, record) } + +type heapCmdType string + +const ( + heapCmdPush heapCmdType = "push" + heapCmdFront heapCmdType = "front" + heapCmdPop heapCmdType = "pop" + heapCmdUpdate heapCmdType = "update" + heapCmdGet heapCmdType = "get" + heapCmdLength heapCmdType = "len" + DefaultPriority = 0 +) + +type heapCmd struct { + cmd heapCmdType + input interface{} + result chan<- interface{} +} + +type ConcurrentEvalStateHeap struct { + heap *EvalStateHeap + cmdChan chan heapCmd + closeChan chan bool + queueChan chan *EvalState + init sync.Once + activeGoRoutines sync.WaitGroup +} + +func NewConcurrentEvalStateHeap(unique bool) *ConcurrentEvalStateHeap { + h := &ConcurrentEvalStateHeap{ + heap: NewEvalStateHeap(unique), + cmdChan: make(chan heapCmd, 50), + closeChan: make(chan bool), + queueChan: make(chan *EvalState), + } + h.Init() + heap.Init(h.heap) + return h +} + +func (h *ConcurrentEvalStateHeap) Init() { + h.init.Do(func() { + front := make(chan *EvalState, 1) + updateFront := func() { + es := h.heap.Front().GetEvalState() + front <- es + } + + // Channel supplier + go func() { + h.activeGoRoutines.Add(1) + var next *EvalState + for { + if next != nil { + select { + case h.queueChan <- next: + // Queue item has been fetched; get next one. + next = nil + // heap.Pop(h.heap) // RACE with command handler + // updateFront() + h.Pop() + case u := <-front: + // There has been an update to the queue item; replace it. + next = u + case <-h.closeChan: + h.activeGoRoutines.Done() + return + } + } else { + // There is no current queue item, so we only listen to updates + select { + case u := <-front: + next = u + case <-h.closeChan: + h.activeGoRoutines.Done() + return + } + } + } + }() + + // Command handler + go func() { + h.activeGoRoutines.Add(1) + updateFront() + for { + select { + case cmd := <-h.cmdChan: + switch cmd.cmd { + case heapCmdFront: + cmd.result <- h.heap.Front() + case heapCmdLength: + cmd.result <- h.heap.Len() + case heapCmdPush: + heap.Push(h.heap, cmd.input) + updateFront() + case heapCmdPop: + cmd.result <- heap.Pop(h.heap) + updateFront() + case heapCmdGet: + s, _ := h.heap.Get(cmd.input.(string)) + cmd.result <- s + case heapCmdUpdate: + i := cmd.input.(*HeapItem) + if i.index < 0 { + cmd.result <- h.heap.Update(i.EvalState) + } else { + cmd.result <- h.heap.UpdatePriority(i.EvalState, i.Priority) + } + updateFront() + } + case <-h.closeChan: + h.activeGoRoutines.Done() + return + } + } + }() + }) +} + +func (h *ConcurrentEvalStateHeap) Get(key string) *HeapItem { + result := make(chan interface{}) + h.cmdChan <- heapCmd{ + cmd: heapCmdGet, + input: key, + result: result, + } + return (<-result).(*HeapItem) +} + +func (h *ConcurrentEvalStateHeap) Chan() <-chan *EvalState { + h.Len() + return h.queueChan +} + +func (h *ConcurrentEvalStateHeap) Len() int { + result := make(chan interface{}) + h.cmdChan <- heapCmd{ + cmd: heapCmdLength, + result: result, + } + return (<-result).(int) +} + +func (h *ConcurrentEvalStateHeap) Front() *HeapItem { + result := make(chan interface{}) + h.cmdChan <- heapCmd{ + cmd: heapCmdFront, + result: result, + } + return (<-result).(*HeapItem) +} + +func (h *ConcurrentEvalStateHeap) Update(s *EvalState) *HeapItem { + if s == nil { + return nil + } + result := make(chan interface{}) + h.cmdChan <- heapCmd{ + cmd: heapCmdUpdate, + result: result, + input: &HeapItem{ + EvalState: s, + index: -1, // abuse index as a signal to not update priority + }, + } + return (<-result).(*HeapItem) +} + +func (h *ConcurrentEvalStateHeap) UpdatePriority(s *EvalState, priority int) *HeapItem { + if s == nil { + return nil + } + result := make(chan interface{}) + h.cmdChan <- heapCmd{ + cmd: heapCmdUpdate, + result: result, + input: &HeapItem{ + EvalState: s, + Priority: priority, + }, + } + return (<-result).(*HeapItem) +} + +func (h *ConcurrentEvalStateHeap) Push(s *EvalState) { + if s == nil { + return + } + h.cmdChan <- heapCmd{ + cmd: heapCmdPush, + input: s, + } +} + +func (h *ConcurrentEvalStateHeap) PushPriority(s *EvalState, priority int) { + if s == nil { + return + } + h.cmdChan <- heapCmd{ + cmd: heapCmdPush, + input: &HeapItem{ + EvalState: s, + Priority: priority, + }, + } +} + +func (h *ConcurrentEvalStateHeap) Pop() *EvalState { + result := make(chan interface{}) + h.cmdChan <- heapCmd{ + cmd: heapCmdPop, + result: result, + } + res := <-result + if res == nil { + return nil + } + return (res).(*EvalState) +} + +func (h *ConcurrentEvalStateHeap) Close() error { + h.closeChan <- true + close(h.closeChan) + h.activeGoRoutines.Wait() + close(h.cmdChan) + close(h.queueChan) + return nil +} + +type HeapItem struct { + *EvalState + Priority int + index int +} + +func (h *HeapItem) GetEvalState() *EvalState { + if h == nil { + return nil + } + return h.EvalState +} + +type EvalStateHeap struct { + heap []*HeapItem + items map[string]*HeapItem + unique bool +} + +func NewEvalStateHeap(unique bool) *EvalStateHeap { + return &EvalStateHeap{ + items: map[string]*HeapItem{}, + unique: unique, + } +} +func (h EvalStateHeap) Len() int { + return len(h.heap) +} + +func (h EvalStateHeap) Front() *HeapItem { + if h.Len() == 0 { + return nil + } + return h.heap[0] +} + +func (h EvalStateHeap) Less(i, j int) bool { + it := h.heap[i] + jt := h.heap[j] + + // Check priorities (descending) + if it.Priority > jt.Priority { + return true + } else if it.Priority < jt.Priority { + return false + } + + // If priorities are equal, compare timestamp (ascending) + return ignoreOk(it.Last()).Timestamp.Before(ignoreOk(jt.Last()).Timestamp) +} + +func (h EvalStateHeap) Swap(i, j int) { + h.heap[i], h.heap[j] = h.heap[j], h.heap[i] + h.heap[i].index = i + h.heap[j].index = j +} + +// Use heap.Push +// The signature of push requests an interface{} to adhere to the sort.Interface interface, but will panic if a +// a type other than *EvalState is provided. +func (h *EvalStateHeap) Push(x interface{}) { + switch t := x.(type) { + case *EvalState: + h.pushPriority(t, DefaultPriority) + case *HeapItem: + h.pushPriority(t.EvalState, t.Priority) + default: + panic(fmt.Sprintf("invalid entity submitted: %v", t)) + } +} + +func (h *EvalStateHeap) pushPriority(state *EvalState, priority int) { + if h.unique { + if _, ok := h.items[state.id]; ok { + h.UpdatePriority(state, priority) + return + } + } + el := &HeapItem{ + EvalState: state, + Priority: priority, + index: h.Len(), + } + h.heap = append(h.heap, el) + h.items[state.id] = el +} + +// Use heap.Pop +func (h *EvalStateHeap) Pop() interface{} { + if h.Len() == 0 { + return nil + } + popped := h.heap[h.Len()-1] + delete(h.items, popped.id) + h.heap = h.heap[:h.Len()-1] + return popped.EvalState +} + +func (h *EvalStateHeap) Update(es *EvalState) *HeapItem { + if existing, ok := h.items[es.id]; ok { + existing.EvalState = es + heap.Fix(h, existing.index) + return existing + } + return nil +} + +func (h *EvalStateHeap) UpdatePriority(es *EvalState, priority int) *HeapItem { + if existing, ok := h.items[es.id]; ok { + existing.Priority = priority + existing.EvalState = es + heap.Fix(h, existing.index) + return existing + } + return nil +} + +func (h *EvalStateHeap) Get(key string) (*HeapItem, bool) { + item, ok := h.items[key] + return item, ok +} + +func ignoreOk(r EvalRecord, _ bool) EvalRecord { + return r +} diff --git a/pkg/controller/evaluation_test.go b/pkg/controller/evaluation_test.go index 18a96d70..7bfc1d04 100644 --- a/pkg/controller/evaluation_test.go +++ b/pkg/controller/evaluation_test.go @@ -25,7 +25,7 @@ func TestEvalLog_Append(t *testing.T) { } log.Record(record2) - assert.Equal(t, 2, log.Count()) + assert.Equal(t, 2, log.Len()) last, ok := log.Last() assert.True(t, ok) assert.EqualValues(t, record2, last) @@ -100,11 +100,11 @@ func TestEvalState_Count(t *testing.T) { es := NewEvalState("id", nil) assert.Equal(t, "id", es.ID()) - c := es.Count() + c := es.Len() assert.Equal(t, 0, c) es.Record(dummyRecord) - c = es.Count() + c = es.Len() assert.Equal(t, 1, c) } @@ -123,31 +123,31 @@ func TestEvalState_Logs(t *testing.T) { } func TestEvalCache_GetOrCreate(t *testing.T) { - ec := NewEvalCache() + ec := EvalStore{} id := "foo" - es, ok := ec.Get(id) + es, ok := ec.Load(id) assert.False(t, ok) assert.Empty(t, es) - es = ec.GetOrCreate(id, nil) + es = ec.LoadOrStore(id, nil) assert.Equal(t, id, es.ID()) - es, ok = ec.Get(id) + es, ok = ec.Load(id) assert.True(t, ok) assert.Equal(t, id, es.ID()) } func TestEvalCache_Invalidate(t *testing.T) { - ec := NewEvalCache() + ec := EvalStore{} id := "completedId" - ec.Put(NewEvalState(id, nil)) - es, ok := ec.Get(id) + ec.Store(NewEvalState(id, nil)) + es, ok := ec.Load(id) assert.True(t, ok) assert.Equal(t, id, es.ID()) - ec.Del(id) - es, ok = ec.Get(id) + ec.Delete(id) + es, ok = ec.Load(id) assert.False(t, ok) assert.Empty(t, es) } diff --git a/pkg/controller/invocation/actions.go b/pkg/controller/invocation/actions.go index 0ed2a000..a31e1798 100644 --- a/pkg/controller/invocation/actions.go +++ b/pkg/controller/invocation/actions.go @@ -35,7 +35,7 @@ func (a *ActonAbort) Eval(cec controller.EvalContext) controller.Action { } func (a *ActonAbort) Apply() error { - wfiLog.Info("Applying action: abort") + log.Info("Applying action: abort") return a.API.Cancel(a.InvocationID) } @@ -61,7 +61,7 @@ func (a *ActionFail) Eval(cec controller.EvalContext) controller.Action { } func (a *ActionFail) Apply() error { - wfiLog.Infof("Applying action: fail (%v)", a.Err) + log.Infof("Applying action: fail (%v)", a.Err) return a.API.Fail(a.InvocationID, a.Err) } diff --git a/pkg/controller/invocation/controller.go b/pkg/controller/invocation/controller.go index cc6d46ce..7cbd0a3b 100644 --- a/pkg/controller/invocation/controller.go +++ b/pkg/controller/invocation/controller.go @@ -8,26 +8,28 @@ import ( "github.com/fission/fission-workflows/pkg/api" "github.com/fission/fission-workflows/pkg/api/aggregates" + "github.com/fission/fission-workflows/pkg/api/events" "github.com/fission/fission-workflows/pkg/controller" "github.com/fission/fission-workflows/pkg/controller/expr" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/scheduler" + "github.com/fission/fission-workflows/pkg/util/gopool" "github.com/fission/fission-workflows/pkg/util/labels" "github.com/fission/fission-workflows/pkg/util/pubsub" "github.com/golang/protobuf/ptypes" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" ) const ( - NotificationBuffer = 100 - defaultEvalQueueSize = 50 - Name = "invocation" + NotificationBuffer = 100 + maxParallelExecutions = 1000 + Name = "invocation" ) var ( - wfiLog = log.WithField("component", "controller.invocation") + log = logrus.WithField("component", "controller.invocation") // workflow-related metrics invocationStatus = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -66,10 +68,8 @@ type Controller struct { sub *pubsub.Subscription cancelFn context.CancelFunc evalPolicy controller.Rule - evalCache *controller.EvalCache - - // evalQueue is a queue of invocation ids - evalQueue chan string + evalStore controller.EvalStore + evalQueue *controller.ConcurrentEvalStateHeap } func NewController(invokeCache fes.CacheReader, wfCache fes.CacheReader, workflowScheduler *scheduler.WorkflowScheduler, @@ -80,16 +80,11 @@ func NewController(invokeCache fes.CacheReader, wfCache fes.CacheReader, workflo scheduler: workflowScheduler, taskAPI: taskAPI, invocationAPI: invocationAPI, - evalQueue: make(chan string, defaultEvalQueueSize), - evalCache: controller.NewEvalCache(), stateStore: stateStore, - - // States maintains an active cache of currently running invocations, with execution related data. - // This state information is considered preemptable and can be removed or lost at any time. - // states: map[string]*ControlState{}, + evalQueue: controller.NewConcurrentEvalStateHeap(true), } - ctr.evalPolicy = defaultPolicy(ctr) + ctr.evalPolicy = defaultPolicy(ctr) return ctr } @@ -112,7 +107,7 @@ func (cr *Controller) Init(sctx context.Context) error { case notification := <-cr.sub.Ch: cr.handleMsg(notification) case <-ctx.Done(): - wfiLog.Debug("Notification listener stopped.") + log.Debug("Notification listener stopped.") return } } @@ -120,14 +115,21 @@ func (cr *Controller) Init(sctx context.Context) error { } // process evaluation queue + pool := gopool.New(maxParallelExecutions) go func(ctx context.Context) { + queue := cr.evalQueue.Chan() for { select { - case eval := <-cr.evalQueue: - go cr.Evaluate(eval) // TODO limit number of goroutines - controller.EvalQueueSize.WithLabelValues("invocation").Dec() + case eval := <-queue: + err := pool.Submit(ctx, func() { + controller.EvalQueueSize.WithLabelValues("invocation").Dec() + cr.Evaluate(eval.ID()) + }) + if err != nil { + log.Errorf("failed to submit invocation %v for execution", eval.ID()) + } case <-ctx.Done(): - wfiLog.Debug("Evaluation queue listener stopped.") + log.Debug("Evaluation queue listener stopped.") return } } @@ -137,48 +139,72 @@ func (cr *Controller) Init(sctx context.Context) error { } func (cr *Controller) handleMsg(msg pubsub.Msg) error { - wfiLog.WithField("labels", msg.Labels()).Debug("Handling invocation notification.") switch n := msg.(type) { case *fes.Notification: cr.Notify(n) default: - wfiLog.WithField("notification", n).Warn("Ignoring unknown notification type") + log.WithField("notification", n).Warn("Ignoring unknown notification type") } return nil } func (cr *Controller) Notify(msg *fes.Notification) error { - wfiLog.WithFields(log.Fields{ + log.WithFields(logrus.Fields{ "notification": msg.EventType, "labels": msg.Labels(), - }).Debug("Handling invocation notification!") - - wfi, ok := msg.Payload.(*aggregates.WorkflowInvocation) - if !ok { - panic(msg) + }).Debugf("Controller event: %v", msg.EventType) + + // TODO avoid struct creations + switch msg.EventType { + case events.TypeOf(&events.InvocationCompleted{}): + fallthrough + case events.TypeOf(&events.InvocationCanceled{}): + fallthrough + case events.TypeOf(&events.InvocationFailed{}): + wfi, ok := msg.Payload.(*aggregates.WorkflowInvocation) + if !ok { + log.Warn("Event did not contain invocation payload", msg) + } + // TODO mark to clean up later instead + cr.stateStore.Delete(wfi.ID()) + cr.evalStore.Delete(wfi.ID()) + log.Infof("Removed invocation %v from eval state", wfi.ID()) + case events.TypeOf(&events.TaskFailed{}): + fallthrough + case events.TypeOf(&events.TaskSucceeded{}): + fallthrough + case events.TypeOf(&events.InvocationCreated{}): + wfi, ok := msg.Payload.(*aggregates.WorkflowInvocation) + if !ok { + panic(msg) + } + es := cr.evalStore.LoadOrStore(wfi.ID(), msg.SpanCtx) + cr.evalQueue.Push(es) + default: + log.Debugf("Controller ignored event type: %v", msg.EventType) } - cr.evalCache.GetOrCreate(wfi.ID(), msg.SpanCtx) - cr.submitEval(wfi.ID()) return nil } func (cr *Controller) Tick(tick uint64) error { // Short loop: invocations the controller is actively tracking var err error - if tick%2 != 0 { - err = cr.checkEvalCaches() + if tick%10 == 0 { + log.Debug("Checking eval store for missing invocations") + err = cr.checkEvalStore() } // Long loop: to check if there are any orphans - if tick%4 != 0 { + if tick%50 == 0 { + log.Debug("Checking model caches for missing invocations") err = cr.checkModelCaches() } return err } -func (cr *Controller) checkEvalCaches() error { - for id, state := range cr.evalCache.List() { +func (cr *Controller) checkEvalStore() error { + for _, state := range cr.evalStore.List() { if state.IsFinished() { continue } @@ -191,7 +217,7 @@ func (cr *Controller) checkEvalCaches() error { reevaluateAt := last.Timestamp.Add(time.Duration(100) * time.Millisecond) if time.Now().UnixNano() > reevaluateAt.UnixNano() { controller.EvalRecovered.WithLabelValues(Name, "evalStore").Inc() - cr.submitEval(id) + cr.evalQueue.Push(state) } } return nil @@ -202,7 +228,8 @@ func (cr *Controller) checkModelCaches() error { // Short control loop entities := cr.invokeCache.List() for _, entity := range entities { - if _, ok := cr.evalCache.Get(entity.Id); ok { + // Ignore those that are in the evalStore; those will get picked up by checkEvalStore. + if _, ok := cr.evalStore.Load(entity.Id); ok { continue } @@ -216,33 +243,18 @@ func (cr *Controller) checkModelCaches() error { if !wi.Status.Finished() { span := opentracing.GlobalTracer().StartSpan("recoverFromModelCache") controller.EvalRecovered.WithLabelValues(Name, "cache").Inc() - cr.evalCache.GetOrCreate(wi.ID(), span.Context()) - cr.submitEval(wi.ID()) + es := cr.evalStore.LoadOrStore(wi.ID(), span.Context()) + cr.evalQueue.Push(es) span.Finish() } } return nil } -func (cr *Controller) submitEval(ids ...string) bool { - for _, id := range ids { - select { - case cr.evalQueue <- id: - controller.EvalQueueSize.WithLabelValues(Name).Inc() - return true - // ok - default: - wfiLog.Warnf("Eval queue is full; dropping eval task for '%v'", id) - return false - } - } - return true -} - func (cr *Controller) Evaluate(invocationID string) { start := time.Now() // Fetch and attempt to claim the evaluation - evalState, ok := cr.evalCache.Get(invocationID) + evalState, ok := cr.evalStore.Load(invocationID) if !ok { log.Warnf("Skipping evaluation of unknown invocation: %v", invocationID) return @@ -252,11 +264,11 @@ func (cr *Controller) Evaluate(invocationID string) { defer evalState.Free() default: // TODO provide option to wait for a lock - wfiLog.Debugf("Failed to obtain access to invocation %s", invocationID) + log.Debugf("Failed to obtain access to invocation %s", invocationID) controller.EvalJobs.WithLabelValues(Name, "duplicate").Inc() return } - log.Debugf("evaluating invocation %s", invocationID) + log.Debugf("Evaluating invocation %s", invocationID) // Fetch the workflow invocation for the provided invocation id wfi := aggregates.NewWorkflowInvocation(invocationID) @@ -269,7 +281,7 @@ func (cr *Controller) Evaluate(invocationID string) { } // TODO move to rule if wfi.Status.Finished() { - wfiLog.Debugf("No need to evaluate finished invocation %v", invocationID) + log.Debugf("No need to evaluate finished invocation %v", invocationID) controller.EvalJobs.WithLabelValues(Name, "error").Inc() evalState.Finish(true, "finished") return @@ -318,13 +330,13 @@ func (cr *Controller) Evaluate(invocationID string) { } func (cr *Controller) Close() error { - cr.evalCache.Close() + cr.evalStore.Close() if invokePub, ok := cr.invokeCache.(pubsub.Publisher); ok { err := invokePub.Unsubscribe(cr.sub) if err != nil { - wfiLog.Errorf("Failed to unsubscribe from invocation cache: %v", err) + log.Errorf("Failed to unsubscribe from invocation cache: %v", err) } else { - wfiLog.Info("Unsubscribed from invocation cache") + log.Info("Unsubscribed from invocation cache") } } diff --git a/pkg/controller/invocation/rules.go b/pkg/controller/invocation/rules.go index d96e16b9..1bcb11be 100644 --- a/pkg/controller/invocation/rules.go +++ b/pkg/controller/invocation/rules.go @@ -10,7 +10,6 @@ import ( "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/typedvalues" "github.com/golang/protobuf/ptypes" - log "github.com/sirupsen/logrus" ) // diff --git a/pkg/controller/rules.go b/pkg/controller/rules.go index c479ae93..75c5fd99 100644 --- a/pkg/controller/rules.go +++ b/pkg/controller/rules.go @@ -49,7 +49,7 @@ type RuleExceededErrorCount struct { func (el *RuleExceededErrorCount) Eval(ec EvalContext) Action { var errorCount int state := ec.EvalState() - for i := state.Count() - 1; i >= 0; i-- { + for i := state.Len() - 1; i >= 0; i-- { record, ok := state.Get(i) if !ok { panic("Illegal modification") diff --git a/pkg/controller/workflow/controller.go b/pkg/controller/workflow/controller.go index ae1e7a39..281a1ebf 100644 --- a/pkg/controller/workflow/controller.go +++ b/pkg/controller/workflow/controller.go @@ -11,6 +11,7 @@ import ( "github.com/fission/fission-workflows/pkg/controller" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/types" + "github.com/fission/fission-workflows/pkg/util/gopool" "github.com/fission/fission-workflows/pkg/util/labels" "github.com/fission/fission-workflows/pkg/util/pubsub" "github.com/golang/protobuf/ptypes" @@ -19,15 +20,16 @@ import ( ) const ( - NotificationBuffer = 100 - defaultEvalQueueSize = 50 - Name = "workflow" + NotificationBuffer = 100 + defaultEvalQueueSize = 50 + Name = "workflow" + maxParallelExecutions = 100 ) // TODO add hard limits (cache size, max concurrent invocation) var ( - wfLog = logrus.WithField("component", "controller.workflow") + log = logrus.WithField("component", "controller.workflow") workflowProcessDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: "workflows", @@ -56,7 +58,7 @@ type Controller struct { sub *pubsub.Subscription cancelFn context.CancelFunc evalQueue chan string - evalCache *controller.EvalCache + evalCache controller.EvalStore evalPolicy controller.Rule } @@ -65,7 +67,7 @@ func NewController(wfCache fes.CacheReader, wfAPI *api.Workflow) *Controller { wfCache: wfCache, api: wfAPI, evalQueue: make(chan string, defaultEvalQueueSize), - evalCache: controller.NewEvalCache(), + evalCache: controller.EvalStore{}, } ctr.evalPolicy = defaultPolicy(ctr) return ctr @@ -90,7 +92,7 @@ func (c *Controller) Init(sctx context.Context) error { case notification := <-c.sub.Ch: c.handleMsg(notification) case <-ctx.Done(): - wfLog.Debug("Notification listener closed.") + log.Debug("Notification listener closed.") return } } @@ -98,14 +100,17 @@ func (c *Controller) Init(sctx context.Context) error { } // process evaluation queue + pool := gopool.New(maxParallelExecutions) go func(ctx context.Context) { for { select { case eval := <-c.evalQueue: - controller.EvalQueueSize.WithLabelValues(Name).Dec() - go c.Evaluate(eval) // TODO limit number of goroutines + pool.Submit(ctx, func() { + controller.EvalQueueSize.WithLabelValues(Name).Dec() + c.Evaluate(eval) + }) case <-ctx.Done(): - wfLog.Debug("Evaluation queue listener stopped.") + log.Debug("Evaluation queue listener stopped.") return } } @@ -115,12 +120,12 @@ func (c *Controller) Init(sctx context.Context) error { } func (c *Controller) handleMsg(msg pubsub.Msg) error { - wfLog.WithField("labels", msg.Labels()).Debug("Handling invocation notification.") + log.WithField("labels", msg.Labels()).Debug("Handling invocation notification.") switch n := msg.(type) { case *fes.Notification: c.Notify(n) default: - wfLog.WithField("notification", n).Warn("Ignoring unknown notification type") + log.WithField("notification", n).Warn("Ignoring unknown notification type") } return nil } @@ -138,7 +143,7 @@ func (c *Controller) Notify(msg *fes.Notification) error { } // If the workflow is not yet tracked create an evalState for it. - c.evalCache.GetOrCreate(wf.ID(), msg.SpanCtx) + c.evalCache.LoadOrStore(wf.ID(), msg.SpanCtx) c.submitEval(wf.ID()) return nil } @@ -146,7 +151,7 @@ func (c *Controller) Notify(msg *fes.Notification) error { func (c *Controller) Evaluate(workflowID string) { start := time.Now() // Fetch and attempt to claim the evaluation - evalState, ok := c.evalCache.Get(workflowID) + evalState, ok := c.evalCache.Load(workflowID) if !ok { logrus.Warnf("Skipping evaluation of unknown workflow: %v", workflowID) return @@ -156,11 +161,11 @@ func (c *Controller) Evaluate(workflowID string) { defer evalState.Free() default: // TODO provide option to wait for a lock - wfLog.Debugf("Failed to obtain access to workflow %s", workflowID) + log.Debugf("Failed to obtain access to workflow %s", workflowID) controller.EvalJobs.WithLabelValues(Name, "duplicate").Inc() return } - wfLog.Debugf("evaluating workflow %s", workflowID) + log.Debugf("evaluating workflow %s", workflowID) // Fetch the workflow relevant to the invocation wf := aggregates.NewWorkflow(workflowID) @@ -188,7 +193,7 @@ func (c *Controller) Evaluate(workflowID string) { // Execute action err = action.Apply() if err != nil { - wfLog.Errorf("Action '%T' failed: %v", action, err) + log.Errorf("Action '%T' failed: %v", action, err) record.Error = err } controller.EvalJobs.WithLabelValues(Name, "action").Inc() @@ -209,9 +214,9 @@ func (c *Controller) Close() error { if invokePub, ok := c.wfCache.(pubsub.Publisher); ok { err := invokePub.Unsubscribe(c.sub) if err != nil { - wfLog.Errorf("Failed to unsubscribe from workflow cache: %v", err) + log.Errorf("Failed to unsubscribe from workflow cache: %v", err) } else { - wfLog.Info("Unsubscribed from workflow cache") + log.Info("Unsubscribed from workflow cache") } } @@ -227,7 +232,7 @@ func (c *Controller) submitEval(ids ...string) bool { return true // ok default: - wfLog.Warnf("Eval queue is full; dropping eval task for '%v'", id) + log.Warnf("Eval queue is full; dropping eval task for '%v'", id) return false } } diff --git a/pkg/controller/workflow/rules.go b/pkg/controller/workflow/rules.go index f49db9a9..f2e0639a 100644 --- a/pkg/controller/workflow/rules.go +++ b/pkg/controller/workflow/rules.go @@ -67,7 +67,7 @@ func EnsureWorkflowContext(cec controller.EvalContext) EvalContext { } type RuleRemoveIfDeleted struct { - evalCache *controller.EvalCache + evalCache controller.EvalStore } func (r *RuleRemoveIfDeleted) Eval(cec controller.EvalContext) controller.Action { diff --git a/pkg/util/gopool/gopool.go b/pkg/util/gopool/gopool.go new file mode 100644 index 00000000..30949577 --- /dev/null +++ b/pkg/util/gopool/gopool.go @@ -0,0 +1,39 @@ +package gopool + +import ( + "context" + "sync/atomic" + + "golang.org/x/sync/semaphore" +) + +type GoPool struct { + routines *semaphore.Weighted + activeRoutines int64 + maxRoutines int64 +} + +func New(maxRoutines int64) *GoPool { + return &GoPool{ + routines: semaphore.NewWeighted(maxRoutines), + } +} + +func (g *GoPool) Active() int64 { + return g.activeRoutines +} + +func (g *GoPool) Submit(ctx context.Context, fn func()) error { + if err := g.routines.Acquire(ctx, 1); err != nil { + return err + } + go g.wrapRoutine(fn) + return nil +} + +func (g *GoPool) wrapRoutine(fn func()) { + atomic.AddInt64(&g.activeRoutines, 1) + fn() + atomic.AddInt64(&g.activeRoutines, -1) + g.routines.Release(1) +} From 8a414edf05443cbdaa8c41dadd1acafc467b5cef Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Fri, 13 Jul 2018 12:04:49 +0200 Subject: [PATCH 2/8] Added test to verify parallel task invocations --- test/integration/bundle/bundle_test.go | 60 ++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/test/integration/bundle/bundle_test.go b/test/integration/bundle/bundle_test.go index 00fbab4c..2ad7686d 100644 --- a/test/integration/bundle/bundle_test.go +++ b/test/integration/bundle/bundle_test.go @@ -14,6 +14,7 @@ import ( "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/typedvalues" "github.com/fission/fission-workflows/test/integration" + "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/empty" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -291,6 +292,65 @@ func TestInlineWorkflowInvocation(t *testing.T) { assert.NoError(t, err) } +func TestParallelInvocation(t *testing.T) { + ctx := context.Background() + conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure()) + if err != nil { + panic(err) + } + cl := apiserver.NewWorkflowAPIClient(conn) + wi := apiserver.NewWorkflowInvocationAPIClient(conn) + + wfSpec := types.NewWorkflowSpec() + + taskSpec := &types.TaskSpec{ + FunctionRef: builtin.Sleep, + Inputs: typedvalues.Input("2s"), + } + + wfSpec.AddTask("p1", taskSpec) + wfSpec.AddTask("p2", taskSpec) + wfSpec.AddTask("p3", taskSpec) + wfSpec.AddTask("p4", taskSpec) + wfSpec.AddTask("p5", taskSpec) + wfSpec.AddTask("await", &types.TaskSpec{ + FunctionRef: builtin.Sleep, + Inputs: typedvalues.Input("1s"), + Requires: types.Require("p1", "p2", "p3", "p4", "p5"), + }) + wfSpec.SetOutput("await") + + wfResp, err := cl.Create(ctx, wfSpec) + defer cl.Delete(ctx, wfResp) + assert.NoError(t, err, err) + assert.NotNil(t, wfResp) + assert.NotEmpty(t, wfResp.Id) + + wiSpec := types.NewWorkflowInvocationSpec(wfResp.Id) + wfi, err := wi.InvokeSync(ctx, wiSpec) + assert.NoError(t, err) + assert.Empty(t, wfi.Status.DynamicTasks) + assert.True(t, wfi.Status.Finished()) + assert.True(t, wfi.Status.Successful()) + assert.Equal(t, len(wfSpec.Tasks), len(wfi.Status.Tasks)) + + // Check if pN tasks were run in parallel + var minStartTime, maxStartTime time.Time + for _, task := range wfi.Status.Tasks { + if strings.HasPrefix(task.Spec.TaskId, "p") { + tt, err := ptypes.Timestamp(task.Metadata.CreatedAt) + assert.NoError(t, err) + if minStartTime == (time.Time{}) || tt.Before(minStartTime) { + minStartTime = tt + } + if maxStartTime == (time.Time{}) || tt.After(maxStartTime) { + maxStartTime = tt + } + } + } + assert.InDelta(t, 0, maxStartTime.Sub(minStartTime).Nanoseconds(), float64(time.Second.Nanoseconds())) +} + func TestLongRunningWorkflowInvocation(t *testing.T) { ctx, cancelFn := context.WithTimeout(context.Background(), TestTimeout) defer cancelFn() From fc02ace72d3de788db5719fd90dbc7e127d74f16 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Fri, 13 Jul 2018 15:15:02 +0200 Subject: [PATCH 3/8] simplify evalstore implementation --- pkg/controller/actions.go | 2 +- pkg/controller/evaluation.go | 37 ++++---------------- pkg/controller/invocation/controller.go | 4 ++- pkg/controller/workflow/controller.go | 4 +-- pkg/controller/workflow/rules.go | 2 +- pkg/util/gopool/gopool.go | 1 + pkg/util/util.go | 46 +++++++++++++++++++++++++ 7 files changed, 61 insertions(+), 35 deletions(-) diff --git a/pkg/controller/actions.go b/pkg/controller/actions.go index 1e763cc4..1397b6f0 100644 --- a/pkg/controller/actions.go +++ b/pkg/controller/actions.go @@ -35,7 +35,7 @@ func (a *ActionSkip) Eval(rule EvalContext) Action { } type ActionRemoveFromEvalCache struct { - EvalCache EvalStore + EvalCache *EvalStore ID string } diff --git a/pkg/controller/evaluation.go b/pkg/controller/evaluation.go index 9577dc71..cd3f6ad7 100644 --- a/pkg/controller/evaluation.go +++ b/pkg/controller/evaluation.go @@ -5,7 +5,6 @@ import ( "fmt" "strings" "sync" - "sync/atomic" "time" "github.com/opentracing/opentracing-go" @@ -16,33 +15,16 @@ import ( // EvalStore allows storing and retrieving EvalStates in a thread-safe way. type EvalStore struct { - states sync.Map - cachedLen *int32 -} - -func (e *EvalStore) Len() int { - if e.cachedLen == nil { - var count int32 - e.states.Range(func(k, v interface{}) bool { - count++ - return true - }) - atomic.StoreInt32(e.cachedLen, count) - return int(count) - } - return int(atomic.LoadInt32(e.cachedLen)) + mp sync.Map } func (e *EvalStore) LoadOrStore(id string, spanCtx opentracing.SpanContext) *EvalState { - s, loaded := e.states.LoadOrStore(id, NewEvalState(id, spanCtx)) - if !loaded && e.cachedLen != nil { - atomic.AddInt32(e.cachedLen, 1) - } + s, _ := e.mp.LoadOrStore(id, NewEvalState(id, spanCtx)) return s.(*EvalState) } func (e *EvalStore) Load(id string) (*EvalState, bool) { - s, ok := e.states.Load(id) + s, ok := e.mp.Load(id) if !ok { return nil, false } @@ -50,18 +32,16 @@ func (e *EvalStore) Load(id string) (*EvalState, bool) { } func (e *EvalStore) Store(state *EvalState) { - e.states.Store(state.id, state) - e.cachedLen = nil // We are not sure if an entry was replaced or added + e.mp.Store(state.id, state) } func (e *EvalStore) Delete(id string) { - e.states.Delete(id) - e.cachedLen = nil // We are not sure if an entry was removed + e.mp.Delete(id) } func (e *EvalStore) List() map[string]*EvalState { results := map[string]*EvalState{} - e.states.Range(func(k, v interface{}) bool { + e.mp.Range(func(k, v interface{}) bool { results[k.(string)] = v.(*EvalState) return true }) @@ -307,8 +287,7 @@ func (h *ConcurrentEvalStateHeap) Init() { h.init.Do(func() { front := make(chan *EvalState, 1) updateFront := func() { - es := h.heap.Front().GetEvalState() - front <- es + front <- h.heap.Front().GetEvalState() } // Channel supplier @@ -321,8 +300,6 @@ func (h *ConcurrentEvalStateHeap) Init() { case h.queueChan <- next: // Queue item has been fetched; get next one. next = nil - // heap.Pop(h.heap) // RACE with command handler - // updateFront() h.Pop() case u := <-front: // There has been an update to the queue item; replace it. diff --git a/pkg/controller/invocation/controller.go b/pkg/controller/invocation/controller.go index 7cbd0a3b..9dd77093 100644 --- a/pkg/controller/invocation/controller.go +++ b/pkg/controller/invocation/controller.go @@ -68,7 +68,7 @@ type Controller struct { sub *pubsub.Subscription cancelFn context.CancelFunc evalPolicy controller.Rule - evalStore controller.EvalStore + evalStore *controller.EvalStore evalQueue *controller.ConcurrentEvalStateHeap } @@ -81,6 +81,7 @@ func NewController(invokeCache fes.CacheReader, wfCache fes.CacheReader, workflo taskAPI: taskAPI, invocationAPI: invocationAPI, stateStore: stateStore, + evalStore: &controller.EvalStore{}, evalQueue: controller.NewConcurrentEvalStateHeap(true), } @@ -323,6 +324,7 @@ func (cr *Controller) Evaluate(invocationID string) { controller.EvalDuration.WithLabelValues(Name, fmt.Sprintf("%T", action)).Observe(float64(time.Now().Sub(start))) if wfi.GetStatus().Finished() { + cr.evalStore.Delete(wfi.ID()) t, _ := ptypes.Timestamp(wfi.GetMetadata().GetCreatedAt()) invocationDuration.Observe(float64(time.Now().Sub(t))) } diff --git a/pkg/controller/workflow/controller.go b/pkg/controller/workflow/controller.go index 281a1ebf..e0941211 100644 --- a/pkg/controller/workflow/controller.go +++ b/pkg/controller/workflow/controller.go @@ -58,7 +58,7 @@ type Controller struct { sub *pubsub.Subscription cancelFn context.CancelFunc evalQueue chan string - evalCache controller.EvalStore + evalCache *controller.EvalStore evalPolicy controller.Rule } @@ -67,7 +67,7 @@ func NewController(wfCache fes.CacheReader, wfAPI *api.Workflow) *Controller { wfCache: wfCache, api: wfAPI, evalQueue: make(chan string, defaultEvalQueueSize), - evalCache: controller.EvalStore{}, + evalCache: &controller.EvalStore{}, } ctr.evalPolicy = defaultPolicy(ctr) return ctr diff --git a/pkg/controller/workflow/rules.go b/pkg/controller/workflow/rules.go index f2e0639a..5c873f3f 100644 --- a/pkg/controller/workflow/rules.go +++ b/pkg/controller/workflow/rules.go @@ -67,7 +67,7 @@ func EnsureWorkflowContext(cec controller.EvalContext) EvalContext { } type RuleRemoveIfDeleted struct { - evalCache controller.EvalStore + evalCache *controller.EvalStore } func (r *RuleRemoveIfDeleted) Eval(cec controller.EvalContext) controller.Action { diff --git a/pkg/util/gopool/gopool.go b/pkg/util/gopool/gopool.go index 30949577..8f34ffae 100644 --- a/pkg/util/gopool/gopool.go +++ b/pkg/util/gopool/gopool.go @@ -1,3 +1,4 @@ +// package gopool provides functionality for bounded parallelism package gopool import ( diff --git a/pkg/util/util.go b/pkg/util/util.go index d885a64d..e3dfea7b 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "sync" + "sync/atomic" "time" "github.com/satori/go.uuid" @@ -60,3 +61,48 @@ func Truncate(val interface{}, maxLen int) string { affix := fmt.Sprintf("", len(s)) return s[:len(s)-len(affix)] + affix } + +// SyncMapLen is simply a sync.Map with options to retrieve the length of it. +type SyncMapLen struct { + mp sync.Map + cachedLen *int32 +} + +func (e *SyncMapLen) Len() int { + if e.cachedLen == nil { + var count int32 + e.mp.Range(func(k, v interface{}) bool { + count++ + return true + }) + atomic.StoreInt32(e.cachedLen, count) + return int(count) + } + return int(atomic.LoadInt32(e.cachedLen)) +} + +func (e *SyncMapLen) LoadOrStore(key interface{}, value interface{}) (actual interface{}, loaded bool) { + actual, loaded = e.mp.LoadOrStore(key, value) + if !loaded && e.cachedLen != nil { + atomic.AddInt32(e.cachedLen, 1) + } + return actual, loaded +} + +func (e *SyncMapLen) Load(key interface{}) (value interface{}, ok bool) { + return e.mp.Load(key) +} + +func (e *SyncMapLen) Store(key, value interface{}) { + e.mp.Store(key, value) + e.cachedLen = nil // We are not sure if an entry was replaced or added +} + +func (e *SyncMapLen) Delete(id string) { + e.mp.Delete(id) + e.cachedLen = nil // We are not sure if an entry was removed +} + +func (e *SyncMapLen) Range(f func(key interface{}, value interface{}) bool) { + e.mp.Range(f) +} From 0da930f537b8456ad205722eeac4870a2c35012b Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Thu, 26 Jul 2018 17:18:36 +0200 Subject: [PATCH 4/8] Remove command channel approach in pqueue - The original implementation was too prone to bugs, and too complicated to use --- pkg/controller/controller.go | 2 +- pkg/controller/evaluation.go | 267 +++++++++--------------- pkg/controller/invocation/controller.go | 8 +- pkg/fnenv/native/builtin/builtin.go | 2 +- test/integration/bundle/bundle_test.go | 49 +++++ 5 files changed, 154 insertions(+), 174 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e44c8f79..a698d556 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -45,7 +45,7 @@ var ( Namespace: "workflows", Subsystem: "controller_workflow", Name: "eval_queue_size", - Help: "A gauge of the evaluation queue cachedLen", + Help: "A gauge of the evaluation queue size", }, []string{"controller"}) ) diff --git a/pkg/controller/evaluation.go b/pkg/controller/evaluation.go index cd3f6ad7..0166a94b 100644 --- a/pkg/controller/evaluation.go +++ b/pkg/controller/evaluation.go @@ -13,6 +13,10 @@ import ( "github.com/sirupsen/logrus" ) +const ( + DefaultPriority = 0 +) + // EvalStore allows storing and retrieving EvalStates in a thread-safe way. type EvalStore struct { mp sync.Map @@ -244,230 +248,157 @@ func (e *EvalLog) Record(record EvalRecord) { *e = append(*e, record) } -type heapCmdType string - -const ( - heapCmdPush heapCmdType = "push" - heapCmdFront heapCmdType = "front" - heapCmdPop heapCmdType = "pop" - heapCmdUpdate heapCmdType = "update" - heapCmdGet heapCmdType = "get" - heapCmdLength heapCmdType = "len" - DefaultPriority = 0 -) - -type heapCmd struct { - cmd heapCmdType - input interface{} - result chan<- interface{} -} - +// ConcurrentEvalStateHeap is a thread-safe adaption of the EvalStateHeap type ConcurrentEvalStateHeap struct { - heap *EvalStateHeap - cmdChan chan heapCmd - closeChan chan bool - queueChan chan *EvalState - init sync.Once - activeGoRoutines sync.WaitGroup + heap *EvalStateHeap + qLock sync.RWMutex // Note: pop is altering state, so it needs a write _lock. + fLock sync.Mutex + init sync.Once + popChan chan *EvalState + frontChan chan *EvalState + updateCond sync.Cond + closeChan chan struct{} } func NewConcurrentEvalStateHeap(unique bool) *ConcurrentEvalStateHeap { h := &ConcurrentEvalStateHeap{ heap: NewEvalStateHeap(unique), - cmdChan: make(chan heapCmd, 50), - closeChan: make(chan bool), - queueChan: make(chan *EvalState), + popChan: make(chan *EvalState), + frontChan: make(chan *EvalState), + closeChan: make(chan struct{}), } h.Init() - heap.Init(h.heap) return h } func (h *ConcurrentEvalStateHeap) Init() { h.init.Do(func() { - front := make(chan *EvalState, 1) - updateFront := func() { - front <- h.heap.Front().GetEvalState() - } - - // Channel supplier go func() { - h.activeGoRoutines.Add(1) - var next *EvalState + var element *EvalState for { - if next != nil { + if element == nil { select { - case h.queueChan <- next: - // Queue item has been fetched; get next one. - next = nil - h.Pop() - case u := <-front: - // There has been an update to the queue item; replace it. - next = u + case element = <-h.frontChan: case <-h.closeChan: - h.activeGoRoutines.Done() return } + } else { - // There is no current queue item, so we only listen to updates select { - case u := <-front: - next = u + case h.popChan <- element: + h.fLock.Lock() + h.qLock.Lock() + element = nil + heap.Pop(h.heap) + front := h.heap.Front() + if front != nil { + element = front.GetEvalState() + } + h.qLock.Unlock() + h.fLock.Unlock() + case element = <-h.frontChan: case <-h.closeChan: - h.activeGoRoutines.Done() return } } } }() - - // Command handler - go func() { - h.activeGoRoutines.Add(1) - updateFront() - for { - select { - case cmd := <-h.cmdChan: - switch cmd.cmd { - case heapCmdFront: - cmd.result <- h.heap.Front() - case heapCmdLength: - cmd.result <- h.heap.Len() - case heapCmdPush: - heap.Push(h.heap, cmd.input) - updateFront() - case heapCmdPop: - cmd.result <- heap.Pop(h.heap) - updateFront() - case heapCmdGet: - s, _ := h.heap.Get(cmd.input.(string)) - cmd.result <- s - case heapCmdUpdate: - i := cmd.input.(*HeapItem) - if i.index < 0 { - cmd.result <- h.heap.Update(i.EvalState) - } else { - cmd.result <- h.heap.UpdatePriority(i.EvalState, i.Priority) - } - updateFront() - } - case <-h.closeChan: - h.activeGoRoutines.Done() - return - } - } - }() }) } -func (h *ConcurrentEvalStateHeap) Get(key string) *HeapItem { - result := make(chan interface{}) - h.cmdChan <- heapCmd{ - cmd: heapCmdGet, - input: key, - result: result, +func (h *ConcurrentEvalStateHeap) Pop() *EvalState { + h.lock() + if h.heap.Len() == 0 { + return nil + } + popped := heap.Pop(h.heap) + h.unlock() + return (popped).(*EvalState) +} + +// lock write-locks the queue, flushing the channel +func (h *ConcurrentEvalStateHeap) lock() { + h.fLock.Lock() + h.frontChan <- nil // clear the pop channel + h.qLock.Lock() +} + +// unlock write-unlocks the queue, filling the popChan +func (h *ConcurrentEvalStateHeap) unlock() { + fmt.Println("len: ", h.heap.Len()) + front := h.heap.Front() + if front != nil { + h.frontChan <- front.GetEvalState() } - return (<-result).(*HeapItem) + h.qLock.Unlock() + h.fLock.Unlock() } func (h *ConcurrentEvalStateHeap) Chan() <-chan *EvalState { - h.Len() - return h.queueChan + return h.popChan } func (h *ConcurrentEvalStateHeap) Len() int { - result := make(chan interface{}) - h.cmdChan <- heapCmd{ - cmd: heapCmdLength, - result: result, - } - return (<-result).(int) + h.qLock.RLock() + count := h.heap.Len() + h.qLock.RUnlock() + return count +} + +func (h *ConcurrentEvalStateHeap) Get(key string) *HeapItem { + h.qLock.RLock() + item := h.heap.Get(key) + h.qLock.RUnlock() + return item } func (h *ConcurrentEvalStateHeap) Front() *HeapItem { - result := make(chan interface{}) - h.cmdChan <- heapCmd{ - cmd: heapCmdFront, - result: result, - } - return (<-result).(*HeapItem) + h.qLock.RLock() + front := h.heap.Front() + h.qLock.RUnlock() + return front } func (h *ConcurrentEvalStateHeap) Update(s *EvalState) *HeapItem { if s == nil { return nil } - result := make(chan interface{}) - h.cmdChan <- heapCmd{ - cmd: heapCmdUpdate, - result: result, - input: &HeapItem{ - EvalState: s, - index: -1, // abuse index as a signal to not update priority - }, - } - return (<-result).(*HeapItem) + h.lock() + updated := h.heap.Update(s) + h.unlock() + return updated } func (h *ConcurrentEvalStateHeap) UpdatePriority(s *EvalState, priority int) *HeapItem { if s == nil { return nil } - result := make(chan interface{}) - h.cmdChan <- heapCmd{ - cmd: heapCmdUpdate, - result: result, - input: &HeapItem{ - EvalState: s, - Priority: priority, - }, - } - return (<-result).(*HeapItem) + h.lock() + updated := h.heap.UpdatePriority(s, priority) + h.unlock() + return updated } func (h *ConcurrentEvalStateHeap) Push(s *EvalState) { if s == nil { return } - h.cmdChan <- heapCmd{ - cmd: heapCmdPush, - input: s, - } + h.lock() + heap.Push(h.heap, s) + h.unlock() } func (h *ConcurrentEvalStateHeap) PushPriority(s *EvalState, priority int) { if s == nil { return } - h.cmdChan <- heapCmd{ - cmd: heapCmdPush, - input: &HeapItem{ - EvalState: s, - Priority: priority, - }, - } -} - -func (h *ConcurrentEvalStateHeap) Pop() *EvalState { - result := make(chan interface{}) - h.cmdChan <- heapCmd{ - cmd: heapCmdPop, - result: result, - } - res := <-result - if res == nil { - return nil + item := &HeapItem{ + EvalState: s, + Priority: priority, } - return (res).(*EvalState) -} - -func (h *ConcurrentEvalStateHeap) Close() error { - h.closeChan <- true - close(h.closeChan) - h.activeGoRoutines.Wait() - close(h.cmdChan) - close(h.queueChan) - return nil + h.lock() + heap.Push(h.heap, item) + h.unlock() } type HeapItem struct { @@ -490,11 +421,14 @@ type EvalStateHeap struct { } func NewEvalStateHeap(unique bool) *EvalStateHeap { - return &EvalStateHeap{ + h := &EvalStateHeap{ items: map[string]*HeapItem{}, unique: unique, } + heap.Init(h) + return h } + func (h EvalStateHeap) Len() int { return len(h.heap) } @@ -527,7 +461,7 @@ func (h EvalStateHeap) Swap(i, j int) { h.heap[j].index = j } -// Use heap.Push +// Do not use directly, use heap.Push! // The signature of push requests an interface{} to adhere to the sort.Interface interface, but will panic if a // a type other than *EvalState is provided. func (h *EvalStateHeap) Push(x interface{}) { @@ -557,11 +491,8 @@ func (h *EvalStateHeap) pushPriority(state *EvalState, priority int) { h.items[state.id] = el } -// Use heap.Pop +// Do not use directly, use heap.Pop! func (h *EvalStateHeap) Pop() interface{} { - if h.Len() == 0 { - return nil - } popped := h.heap[h.Len()-1] delete(h.items, popped.id) h.heap = h.heap[:h.Len()-1] @@ -587,9 +518,9 @@ func (h *EvalStateHeap) UpdatePriority(es *EvalState, priority int) *HeapItem { return nil } -func (h *EvalStateHeap) Get(key string) (*HeapItem, bool) { - item, ok := h.items[key] - return item, ok +func (h *EvalStateHeap) Get(key string) *HeapItem { + item, _ := h.items[key] + return item } func ignoreOk(r EvalRecord, _ bool) EvalRecord { diff --git a/pkg/controller/invocation/controller.go b/pkg/controller/invocation/controller.go index 9dd77093..438da65e 100644 --- a/pkg/controller/invocation/controller.go +++ b/pkg/controller/invocation/controller.go @@ -108,7 +108,7 @@ func (cr *Controller) Init(sctx context.Context) error { case notification := <-cr.sub.Ch: cr.handleMsg(notification) case <-ctx.Done(): - log.Debug("Notification listener stopped.") + log.Info("Notification listener stopped.") return } } @@ -130,7 +130,7 @@ func (cr *Controller) Init(sctx context.Context) error { log.Errorf("failed to submit invocation %v for execution", eval.ID()) } case <-ctx.Done(): - log.Debug("Evaluation queue listener stopped.") + log.Info("Evaluation queue listener stopped.") return } } @@ -191,7 +191,6 @@ func (cr *Controller) Tick(tick uint64) error { // Short loop: invocations the controller is actively tracking var err error if tick%10 == 0 { - log.Debug("Checking eval store for missing invocations") err = cr.checkEvalStore() } @@ -205,7 +204,7 @@ func (cr *Controller) Tick(tick uint64) error { } func (cr *Controller) checkEvalStore() error { - for _, state := range cr.evalStore.List() { + for id, state := range cr.evalStore.List() { if state.IsFinished() { continue } @@ -218,6 +217,7 @@ func (cr *Controller) checkEvalStore() error { reevaluateAt := last.Timestamp.Add(time.Duration(100) * time.Millisecond) if time.Now().UnixNano() > reevaluateAt.UnixNano() { controller.EvalRecovered.WithLabelValues(Name, "evalStore").Inc() + log.Infof("Adding missing invocation %v to the queue", id) cr.evalQueue.Push(state) } } diff --git a/pkg/fnenv/native/builtin/builtin.go b/pkg/fnenv/native/builtin/builtin.go index 1c3cbeb1..a6dd7e9d 100644 --- a/pkg/fnenv/native/builtin/builtin.go +++ b/pkg/fnenv/native/builtin/builtin.go @@ -36,7 +36,7 @@ func ensureInput(inputs map[string]*types.TypedValue, key string, validTypes ... if len(validTypes) > 0 { valid := typedvalues.IsType(tv, validTypes...) if !valid { - return nil, fmt.Errorf("input '%s' is not a valid type (expected: %v)", key, validTypes) + return nil, fmt.Errorf("input '%s' is not a valid type (expected: %v, was: %T)", key, validTypes, tv.Type) } } diff --git a/test/integration/bundle/bundle_test.go b/test/integration/bundle/bundle_test.go index 2ad7686d..dcf52dc9 100644 --- a/test/integration/bundle/bundle_test.go +++ b/test/integration/bundle/bundle_test.go @@ -567,6 +567,55 @@ func TestInvocationWithForcedOutputs(t *testing.T) { assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetOutput().GetValue())) } +func TestDeeplyNestedInvocation(t *testing.T) { + ctx := context.Background() + conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure()) + if err != nil { + panic(err) + } + cl := apiserver.NewWorkflowAPIClient(conn) + wi := apiserver.NewWorkflowInvocationAPIClient(conn) + + // Test workflow creation + wfSpec := &types.WorkflowSpec{ + ApiVersion: types.WorkflowAPIVersion, + OutputTask: "CountUntil", + Tasks: map[string]*types.TaskSpec{ + "CountUntil": { + FunctionRef: builtin.While, + Inputs: types.Inputs{ + builtin.WhileInputExpr: typedvalues.MustParse("{ !task().Inputs._prev || task().Inputs._prev < 5 }"), + builtin.WhileInputLimit: typedvalues.MustParse(10), + builtin.WhileInputAction: typedvalues.MustParse(&types.TaskSpec{ + FunctionRef: builtin.Noop, + Inputs: types.Inputs{ + builtin.NoopInput: typedvalues.MustParse("{ (task().Inputs._prev || 0) + 1 }"), + }, + }), + }, + }, + }, + } + wfResp, err := cl.Create(ctx, wfSpec) + defer cl.Delete(ctx, wfResp) + + assert.NoError(t, err) + assert.NotNil(t, wfResp) + assert.NotEmpty(t, wfResp.Id) + + wiSpec := &types.WorkflowInvocationSpec{ + WorkflowId: wfResp.Id, + } + wfi, err := wi.InvokeSync(ctx, wiSpec) + assert.NoError(t, err) + assert.NotEmpty(t, wfi.Status.DynamicTasks) + assert.True(t, wfi.Status.Finished()) + assert.True(t, wfi.Status.Successful()) + + output := typedvalues.MustFormat(wfi.Status.Output) + assert.Equal(t, 5, output) +} + func setup() (apiserver.WorkflowAPIClient, apiserver.WorkflowInvocationAPIClient) { conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure()) if err != nil { From 388097fd96a94e7c5bbdfd34bcb3e7dcc8bd598f Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Thu, 26 Jul 2018 18:54:29 +0200 Subject: [PATCH 5/8] Fixed goroutine leak in expr The resolving timeout (that kills a expression if it takes longer than 100ms) could get stuck on a non-responsive channel. This is likely when the 'vm' was garbage collected before the interrupt occured --- pkg/controller/expr/expr.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/controller/expr/expr.go b/pkg/controller/expr/expr.go index 8d7b7ed3..b46d85b0 100644 --- a/pkg/controller/expr/expr.go +++ b/pkg/controller/expr/expr.go @@ -98,8 +98,12 @@ func (oe *JavascriptExpressionParser) resolveExpr(rootScope interface{}, current go func() { <-time.After(ResolvingTimeout) - scoped.Interrupt <- func() { + select { + case scoped.Interrupt <- func() { panic(ErrTimeOut) + }: + default: + // evaluation has already been interrupted / quit } }() From b4cbaae6b7d085acba82fc45aae295c41b21cead Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Fri, 27 Jul 2018 11:27:45 +0200 Subject: [PATCH 6/8] Fix race condition gopool --- pkg/controller/evaluation.go | 4 +-- pkg/controller/invocation/controller.go | 19 ++++++----- pkg/fnenv/native/builtin/while.go | 1 - pkg/fnenv/workflows/workflows_test.go | 2 -- .../typedvalues/httpconv/httpconv_test.go | 3 -- pkg/util/gopool/gopool.go | 8 +++-- pkg/util/gopool/gopool_test.go | 32 +++++++++++++++++++ test/integration/bundle/bundle_test.go | 2 +- 8 files changed, 52 insertions(+), 19 deletions(-) create mode 100644 pkg/util/gopool/gopool_test.go diff --git a/pkg/controller/evaluation.go b/pkg/controller/evaluation.go index 0166a94b..29d82102 100644 --- a/pkg/controller/evaluation.go +++ b/pkg/controller/evaluation.go @@ -286,7 +286,7 @@ func (h *ConcurrentEvalStateHeap) Init() { } else { select { case h.popChan <- element: - h.fLock.Lock() + h.fLock.Lock() // TODO change into a less strict lock, commands will have to wait until it is done h.qLock.Lock() element = nil heap.Pop(h.heap) @@ -318,6 +318,7 @@ func (h *ConcurrentEvalStateHeap) Pop() *EvalState { // lock write-locks the queue, flushing the channel func (h *ConcurrentEvalStateHeap) lock() { + h.frontChan <- nil // clear the pop channel h.fLock.Lock() h.frontChan <- nil // clear the pop channel h.qLock.Lock() @@ -325,7 +326,6 @@ func (h *ConcurrentEvalStateHeap) lock() { // unlock write-unlocks the queue, filling the popChan func (h *ConcurrentEvalStateHeap) unlock() { - fmt.Println("len: ", h.heap.Len()) front := h.heap.Front() if front != nil { h.frontChan <- front.GetEvalState() diff --git a/pkg/controller/invocation/controller.go b/pkg/controller/invocation/controller.go index 438da65e..1671b518 100644 --- a/pkg/controller/invocation/controller.go +++ b/pkg/controller/invocation/controller.go @@ -122,13 +122,16 @@ func (cr *Controller) Init(sctx context.Context) error { for { select { case eval := <-queue: - err := pool.Submit(ctx, func() { - controller.EvalQueueSize.WithLabelValues("invocation").Dec() - cr.Evaluate(eval.ID()) - }) - if err != nil { - log.Errorf("failed to submit invocation %v for execution", eval.ID()) - } + func() { + ee := eval + err := pool.Submit(ctx, func() { + controller.EvalQueueSize.WithLabelValues("invocation").Dec() + cr.Evaluate(ee.ID()) + }) + if err != nil { + log.Errorf("failed to submit invocation %v for execution", eval.ID()) + } + }() case <-ctx.Done(): log.Info("Evaluation queue listener stopped.") return @@ -196,7 +199,7 @@ func (cr *Controller) Tick(tick uint64) error { // Long loop: to check if there are any orphans if tick%50 == 0 { - log.Debug("Checking model caches for missing invocations") + log.Info("Checking model caches for missing invocations") err = cr.checkModelCaches() } diff --git a/pkg/fnenv/native/builtin/while.go b/pkg/fnenv/native/builtin/while.go index 3cb6e78b..cbbda985 100644 --- a/pkg/fnenv/native/builtin/while.go +++ b/pkg/fnenv/native/builtin/while.go @@ -162,7 +162,6 @@ func (fn *FunctionWhile) Invoke(spec *types.TaskInvocationSpec) (*types.TypedVal } } - fmt.Println("count", countTv) wf := &types.WorkflowSpec{ OutputTask: "condition", Tasks: map[string]*types.TaskSpec{ diff --git a/pkg/fnenv/workflows/workflows_test.go b/pkg/fnenv/workflows/workflows_test.go index 34cec344..b01cb79a 100644 --- a/pkg/fnenv/workflows/workflows_test.go +++ b/pkg/fnenv/workflows/workflows_test.go @@ -3,7 +3,6 @@ package workflows import ( "context" "errors" - "fmt" "testing" "time" @@ -71,7 +70,6 @@ func TestRuntime_InvokeWorkflow_PollSuccess(t *testing.T) { // Simulate workflow invocation time.Sleep(50 * time.Millisecond) entities := cache.List() - fmt.Println(entities) wfiID := entities[0].Id err := invocationAPI.Complete(wfiID, output) if err != nil { diff --git a/pkg/types/typedvalues/httpconv/httpconv_test.go b/pkg/types/typedvalues/httpconv/httpconv_test.go index 97892b69..73911bb4 100644 --- a/pkg/types/typedvalues/httpconv/httpconv_test.go +++ b/pkg/types/typedvalues/httpconv/httpconv_test.go @@ -1,7 +1,6 @@ package httpconv import ( - "fmt" "io" "io/ioutil" "net/http" @@ -50,8 +49,6 @@ func TestFormatRequest(t *testing.T) { assert.Equal(t, headers["Header-Key"], target.Header["Header-Key"][0]) // Check query - fmt.Println(query) - fmt.Println(target.URL.Query()) assert.Equal(t, query["queryKey"], target.URL.Query()["queryKey"][0]) // Check method diff --git a/pkg/util/gopool/gopool.go b/pkg/util/gopool/gopool.go index 8f34ffae..d75e08d8 100644 --- a/pkg/util/gopool/gopool.go +++ b/pkg/util/gopool/gopool.go @@ -1,4 +1,4 @@ -// package gopool provides functionality for bounded parallelism +// package gopool provides functionality for bounded parallelism with goroutines package gopool import ( @@ -20,8 +20,12 @@ func New(maxRoutines int64) *GoPool { } } +func (g *GoPool) Max() int64 { + return g.maxRoutines +} + func (g *GoPool) Active() int64 { - return g.activeRoutines + return atomic.LoadInt64(&g.activeRoutines) } func (g *GoPool) Submit(ctx context.Context, fn func()) error { diff --git a/pkg/util/gopool/gopool_test.go b/pkg/util/gopool/gopool_test.go new file mode 100644 index 00000000..4325a5c0 --- /dev/null +++ b/pkg/util/gopool/gopool_test.go @@ -0,0 +1,32 @@ +package gopool + +import ( + "context" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGoPool(t *testing.T) { + size := 100 + ctx := context.Background() + pool := New(5) + // Overflow pool + wg := sync.WaitGroup{} + var sum int32 + wg.Add(size) + assert.Equal(t, int64(0), pool.Active()) + for i := 1; i <= size; i++ { + a := i + err := pool.Submit(ctx, func() { + atomic.AddInt32(&sum, int32(a)) + wg.Done() + }) + assert.NoError(t, err) + } + wg.Wait() + assert.Equal(t, int32(size*(size+1)/2), sum) + assert.Equal(t, int64(0), pool.Active()) +} diff --git a/test/integration/bundle/bundle_test.go b/test/integration/bundle/bundle_test.go index dcf52dc9..df5d2621 100644 --- a/test/integration/bundle/bundle_test.go +++ b/test/integration/bundle/bundle_test.go @@ -613,7 +613,7 @@ func TestDeeplyNestedInvocation(t *testing.T) { assert.True(t, wfi.Status.Successful()) output := typedvalues.MustFormat(wfi.Status.Output) - assert.Equal(t, 5, output) + assert.Equal(t, float64(5), output) } func setup() (apiserver.WorkflowAPIClient, apiserver.WorkflowInvocationAPIClient) { From df0e551b83bf66fd39794660bfacbd544d50dd61 Mon Sep 17 00:00:00 2001 From: xiekeyang Date: Tue, 21 Aug 2018 19:13:54 +0800 Subject: [PATCH 7/8] Add workqueue to Invocation Controller This introduce a workqueue structure from k8s client, which support thread safety object adding and polling reading. --- pkg/controller/invocation/controller.go | 77 +++++++++++++++---------- 1 file changed, 46 insertions(+), 31 deletions(-) diff --git a/pkg/controller/invocation/controller.go b/pkg/controller/invocation/controller.go index 1671b518..ba4fb722 100644 --- a/pkg/controller/invocation/controller.go +++ b/pkg/controller/invocation/controller.go @@ -6,6 +6,13 @@ import ( "fmt" "time" + "github.com/golang/protobuf/ptypes" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" + "github.com/fission/fission-workflows/pkg/api" "github.com/fission/fission-workflows/pkg/api/aggregates" "github.com/fission/fission-workflows/pkg/api/events" @@ -16,10 +23,6 @@ import ( "github.com/fission/fission-workflows/pkg/util/gopool" "github.com/fission/fission-workflows/pkg/util/labels" "github.com/fission/fission-workflows/pkg/util/pubsub" - "github.com/golang/protobuf/ptypes" - "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" ) const ( @@ -69,7 +72,7 @@ type Controller struct { cancelFn context.CancelFunc evalPolicy controller.Rule evalStore *controller.EvalStore - evalQueue *controller.ConcurrentEvalStateHeap + workQueue workqueue.RateLimitingInterface } func NewController(invokeCache fes.CacheReader, wfCache fes.CacheReader, workflowScheduler *scheduler.WorkflowScheduler, @@ -82,7 +85,7 @@ func NewController(invokeCache fes.CacheReader, wfCache fes.CacheReader, workflo invocationAPI: invocationAPI, stateStore: stateStore, evalStore: &controller.EvalStore{}, - evalQueue: controller.NewConcurrentEvalStateHeap(true), + workQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), } ctr.evalPolicy = defaultPolicy(ctr) @@ -116,28 +119,7 @@ func (cr *Controller) Init(sctx context.Context) error { } // process evaluation queue - pool := gopool.New(maxParallelExecutions) - go func(ctx context.Context) { - queue := cr.evalQueue.Chan() - for { - select { - case eval := <-queue: - func() { - ee := eval - err := pool.Submit(ctx, func() { - controller.EvalQueueSize.WithLabelValues("invocation").Dec() - cr.Evaluate(ee.ID()) - }) - if err != nil { - log.Errorf("failed to submit invocation %v for execution", eval.ID()) - } - }() - case <-ctx.Done(): - log.Info("Evaluation queue listener stopped.") - return - } - } - }(ctx) + go wait.Until(cr.runWorker(sctx), time.Second, sctx.Done()) return nil } @@ -183,7 +165,7 @@ func (cr *Controller) Notify(msg *fes.Notification) error { panic(msg) } es := cr.evalStore.LoadOrStore(wfi.ID(), msg.SpanCtx) - cr.evalQueue.Push(es) + cr.workQueue.Add(es) default: log.Debugf("Controller ignored event type: %v", msg.EventType) } @@ -221,7 +203,7 @@ func (cr *Controller) checkEvalStore() error { if time.Now().UnixNano() > reevaluateAt.UnixNano() { controller.EvalRecovered.WithLabelValues(Name, "evalStore").Inc() log.Infof("Adding missing invocation %v to the queue", id) - cr.evalQueue.Push(state) + cr.workQueue.Add(state) } } return nil @@ -248,7 +230,7 @@ func (cr *Controller) checkModelCaches() error { span := opentracing.GlobalTracer().StartSpan("recoverFromModelCache") controller.EvalRecovered.WithLabelValues(Name, "cache").Inc() es := cr.evalStore.LoadOrStore(wi.ID(), span.Context()) - cr.evalQueue.Push(es) + cr.workQueue.Add(es) span.Finish() } } @@ -357,6 +339,39 @@ func (cr *Controller) createFailAction(invocationID string, err error) controlle } } +func (cr *Controller) runWorker(ctx context.Context) func() { + return func() { + pool := gopool.New(maxParallelExecutions) + + for cr.processNextItem(ctx, pool) { + // continue looping + } + } +} + +func (cr *Controller) processNextItem(ctx context.Context, pool *gopool.GoPool) bool { + key, quit := cr.workQueue.Get() + if quit { + return false + } + defer cr.workQueue.Done(key) + + es := key.(*controller.EvalState) + + err := pool.Submit(ctx, func() { + controller.EvalQueueSize.WithLabelValues("invocation").Dec() + cr.Evaluate(es.ID()) + }) + if err != nil { + log.Errorf("failed to submit invocation %v for execution", es.ID()) + } + + // No error, reset the ratelimit counters + cr.workQueue.Forget(key) + + return true +} + func defaultPolicy(ctr *Controller) controller.Rule { return &controller.RuleEvalUntilAction{ Rules: []controller.Rule{ From 053efb8fe5ed0c317df21aa6c5e02069759c4dbb Mon Sep 17 00:00:00 2001 From: xiekeyang Date: Tue, 21 Aug 2018 19:56:36 +0800 Subject: [PATCH 8/8] remove state heap structure --- pkg/controller/evaluation.go | 153 ----------------------------------- 1 file changed, 153 deletions(-) diff --git a/pkg/controller/evaluation.go b/pkg/controller/evaluation.go index 29d82102..67d68b67 100644 --- a/pkg/controller/evaluation.go +++ b/pkg/controller/evaluation.go @@ -248,159 +248,6 @@ func (e *EvalLog) Record(record EvalRecord) { *e = append(*e, record) } -// ConcurrentEvalStateHeap is a thread-safe adaption of the EvalStateHeap -type ConcurrentEvalStateHeap struct { - heap *EvalStateHeap - qLock sync.RWMutex // Note: pop is altering state, so it needs a write _lock. - fLock sync.Mutex - init sync.Once - popChan chan *EvalState - frontChan chan *EvalState - updateCond sync.Cond - closeChan chan struct{} -} - -func NewConcurrentEvalStateHeap(unique bool) *ConcurrentEvalStateHeap { - h := &ConcurrentEvalStateHeap{ - heap: NewEvalStateHeap(unique), - popChan: make(chan *EvalState), - frontChan: make(chan *EvalState), - closeChan: make(chan struct{}), - } - h.Init() - return h -} - -func (h *ConcurrentEvalStateHeap) Init() { - h.init.Do(func() { - go func() { - var element *EvalState - for { - if element == nil { - select { - case element = <-h.frontChan: - case <-h.closeChan: - return - } - - } else { - select { - case h.popChan <- element: - h.fLock.Lock() // TODO change into a less strict lock, commands will have to wait until it is done - h.qLock.Lock() - element = nil - heap.Pop(h.heap) - front := h.heap.Front() - if front != nil { - element = front.GetEvalState() - } - h.qLock.Unlock() - h.fLock.Unlock() - case element = <-h.frontChan: - case <-h.closeChan: - return - } - } - } - }() - }) -} - -func (h *ConcurrentEvalStateHeap) Pop() *EvalState { - h.lock() - if h.heap.Len() == 0 { - return nil - } - popped := heap.Pop(h.heap) - h.unlock() - return (popped).(*EvalState) -} - -// lock write-locks the queue, flushing the channel -func (h *ConcurrentEvalStateHeap) lock() { - h.frontChan <- nil // clear the pop channel - h.fLock.Lock() - h.frontChan <- nil // clear the pop channel - h.qLock.Lock() -} - -// unlock write-unlocks the queue, filling the popChan -func (h *ConcurrentEvalStateHeap) unlock() { - front := h.heap.Front() - if front != nil { - h.frontChan <- front.GetEvalState() - } - h.qLock.Unlock() - h.fLock.Unlock() -} - -func (h *ConcurrentEvalStateHeap) Chan() <-chan *EvalState { - return h.popChan -} - -func (h *ConcurrentEvalStateHeap) Len() int { - h.qLock.RLock() - count := h.heap.Len() - h.qLock.RUnlock() - return count -} - -func (h *ConcurrentEvalStateHeap) Get(key string) *HeapItem { - h.qLock.RLock() - item := h.heap.Get(key) - h.qLock.RUnlock() - return item -} - -func (h *ConcurrentEvalStateHeap) Front() *HeapItem { - h.qLock.RLock() - front := h.heap.Front() - h.qLock.RUnlock() - return front -} - -func (h *ConcurrentEvalStateHeap) Update(s *EvalState) *HeapItem { - if s == nil { - return nil - } - h.lock() - updated := h.heap.Update(s) - h.unlock() - return updated -} - -func (h *ConcurrentEvalStateHeap) UpdatePriority(s *EvalState, priority int) *HeapItem { - if s == nil { - return nil - } - h.lock() - updated := h.heap.UpdatePriority(s, priority) - h.unlock() - return updated -} - -func (h *ConcurrentEvalStateHeap) Push(s *EvalState) { - if s == nil { - return - } - h.lock() - heap.Push(h.heap, s) - h.unlock() -} - -func (h *ConcurrentEvalStateHeap) PushPriority(s *EvalState, priority int) { - if s == nil { - return - } - item := &HeapItem{ - EvalState: s, - Priority: priority, - } - h.lock() - heap.Push(h.heap, item) - h.unlock() -} - type HeapItem struct { *EvalState Priority int