Skip to content

Commit

Permalink
Merge pull request #167 from fission/eval-heap
Browse files Browse the repository at this point in the history
Use priority queue instead of queue
  • Loading branch information
erwinvaneyk authored Aug 28, 2018
2 parents 8c1965a + 053efb8 commit 397fea2
Show file tree
Hide file tree
Showing 18 changed files with 546 additions and 163 deletions.
6 changes: 3 additions & 3 deletions pkg/controller/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/fission/fission-workflows/pkg/fes"
)

// TODO remove from EvalCache actions
// TODO remove from EvalStore actions

type ActionWait struct {
EvalState *EvalState
Expand All @@ -35,12 +35,12 @@ func (a *ActionSkip) Eval(rule EvalContext) Action {
}

type ActionRemoveFromEvalCache struct {
EvalCache *EvalCache
EvalCache *EvalStore
ID string
}

func (a *ActionRemoveFromEvalCache) Apply() error {
a.EvalCache.Del(a.ID)
a.EvalCache.Delete(a.ID)
return nil
}

Expand Down
202 changes: 160 additions & 42 deletions pkg/controller/evaluation.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package controller

import (
"container/heap"
"fmt"
"strings"
"sync"
"time"
Expand All @@ -11,65 +13,52 @@ import (
"github.com/sirupsen/logrus"
)

// EvalCache allows storing and retrieving EvalStates in a thread-safe way.
type EvalCache struct {
states map[string]*EvalState
lock sync.RWMutex
const (
DefaultPriority = 0
)

// EvalStore allows storing and retrieving EvalStates in a thread-safe way.
type EvalStore struct {
mp sync.Map
}

func NewEvalCache() *EvalCache {
return &EvalCache{
states: map[string]*EvalState{},
}
func (e *EvalStore) LoadOrStore(id string, spanCtx opentracing.SpanContext) *EvalState {
s, _ := e.mp.LoadOrStore(id, NewEvalState(id, spanCtx))
return s.(*EvalState)
}

func (e *EvalCache) GetOrCreate(id string, spanCtx opentracing.SpanContext) *EvalState {
s, ok := e.Get(id)
func (e *EvalStore) Load(id string) (*EvalState, bool) {
s, ok := e.mp.Load(id)
if !ok {
s = NewEvalState(id, spanCtx)
e.Put(s)
return nil, false
}
return s
return s.(*EvalState), true
}

func (e *EvalCache) Get(id string) (*EvalState, bool) {
e.lock.RLock()
s, ok := e.states[id]
e.lock.RUnlock()
return s, ok
func (e *EvalStore) Store(state *EvalState) {
e.mp.Store(state.id, state)
}

func (e *EvalCache) Put(state *EvalState) {
e.lock.Lock()
e.states[state.id] = state
e.lock.Unlock()
func (e *EvalStore) Delete(id string) {
e.mp.Delete(id)
}

func (e *EvalCache) Del(id string) {
e.lock.Lock()
delete(e.states, id)
e.lock.Unlock()
}

func (e *EvalCache) List() map[string]*EvalState {
func (e *EvalStore) List() map[string]*EvalState {
results := map[string]*EvalState{}
e.lock.RLock()
for id, state := range e.states {
results[id] = state
}
e.lock.RUnlock()
e.mp.Range(func(k, v interface{}) bool {
results[k.(string)] = v.(*EvalState)
return true
})
return results
}

func (e *EvalCache) Close() error {
e.lock.RLock()
for _, es := range e.states {
func (e *EvalStore) Close() error {
for _, es := range e.List() {
err := es.Close()
if err != nil {
logrus.Errorf("Failed to close evaluation state: %v", err)
}
}
e.lock.RUnlock()
return nil
}

Expand Down Expand Up @@ -162,10 +151,13 @@ func (e *EvalState) Free() {
}

func (e *EvalState) ID() string {
if e == nil {
return ""
}
return e.id
}

func (e *EvalState) Count() int {
func (e *EvalState) Len() int {
e.dataLock.RLock()
defer e.dataLock.RUnlock()
return len(e.log)
Expand Down Expand Up @@ -234,19 +226,19 @@ func NewEvalRecord() EvalRecord {
// EvalLog is a time-ordered log of evaluation records. Newer records are appended to the end of the log.
type EvalLog []EvalRecord

func (e EvalLog) Count() int {
func (e EvalLog) Len() int {
return len(e)
}

func (e EvalLog) Last() (EvalRecord, bool) {
if e.Count() == 0 {
if e.Len() == 0 {
return EvalRecord{}, false
}
return e[len(e)-1], true
}

func (e EvalLog) First() (EvalRecord, bool) {
if e.Count() == 0 {
if e.Len() == 0 {
return EvalRecord{}, false
}
return e[0], true
Expand All @@ -255,3 +247,129 @@ func (e EvalLog) First() (EvalRecord, bool) {
func (e *EvalLog) Record(record EvalRecord) {
*e = append(*e, record)
}

type HeapItem struct {
*EvalState
Priority int
index int
}

func (h *HeapItem) GetEvalState() *EvalState {
if h == nil {
return nil
}
return h.EvalState
}

type EvalStateHeap struct {
heap []*HeapItem
items map[string]*HeapItem
unique bool
}

func NewEvalStateHeap(unique bool) *EvalStateHeap {
h := &EvalStateHeap{
items: map[string]*HeapItem{},
unique: unique,
}
heap.Init(h)
return h
}

func (h EvalStateHeap) Len() int {
return len(h.heap)
}

func (h EvalStateHeap) Front() *HeapItem {
if h.Len() == 0 {
return nil
}
return h.heap[0]
}

func (h EvalStateHeap) Less(i, j int) bool {
it := h.heap[i]
jt := h.heap[j]

// Check priorities (descending)
if it.Priority > jt.Priority {
return true
} else if it.Priority < jt.Priority {
return false
}

// If priorities are equal, compare timestamp (ascending)
return ignoreOk(it.Last()).Timestamp.Before(ignoreOk(jt.Last()).Timestamp)
}

func (h EvalStateHeap) Swap(i, j int) {
h.heap[i], h.heap[j] = h.heap[j], h.heap[i]
h.heap[i].index = i
h.heap[j].index = j
}

// Do not use directly, use heap.Push!
// The signature of push requests an interface{} to adhere to the sort.Interface interface, but will panic if a
// a type other than *EvalState is provided.
func (h *EvalStateHeap) Push(x interface{}) {
switch t := x.(type) {
case *EvalState:
h.pushPriority(t, DefaultPriority)
case *HeapItem:
h.pushPriority(t.EvalState, t.Priority)
default:
panic(fmt.Sprintf("invalid entity submitted: %v", t))
}
}

func (h *EvalStateHeap) pushPriority(state *EvalState, priority int) {
if h.unique {
if _, ok := h.items[state.id]; ok {
h.UpdatePriority(state, priority)
return
}
}
el := &HeapItem{
EvalState: state,
Priority: priority,
index: h.Len(),
}
h.heap = append(h.heap, el)
h.items[state.id] = el
}

// Do not use directly, use heap.Pop!
func (h *EvalStateHeap) Pop() interface{} {
popped := h.heap[h.Len()-1]
delete(h.items, popped.id)
h.heap = h.heap[:h.Len()-1]
return popped.EvalState
}

func (h *EvalStateHeap) Update(es *EvalState) *HeapItem {
if existing, ok := h.items[es.id]; ok {
existing.EvalState = es
heap.Fix(h, existing.index)
return existing
}
return nil
}

func (h *EvalStateHeap) UpdatePriority(es *EvalState, priority int) *HeapItem {
if existing, ok := h.items[es.id]; ok {
existing.Priority = priority
existing.EvalState = es
heap.Fix(h, existing.index)
return existing
}
return nil
}

func (h *EvalStateHeap) Get(key string) *HeapItem {
item, _ := h.items[key]
return item
}

func ignoreOk(r EvalRecord, _ bool) EvalRecord {
return r
}
24 changes: 12 additions & 12 deletions pkg/controller/evaluation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestEvalLog_Append(t *testing.T) {
}
log.Record(record2)

assert.Equal(t, 2, log.Count())
assert.Equal(t, 2, log.Len())
last, ok := log.Last()
assert.True(t, ok)
assert.EqualValues(t, record2, last)
Expand Down Expand Up @@ -100,11 +100,11 @@ func TestEvalState_Count(t *testing.T) {
es := NewEvalState("id", nil)
assert.Equal(t, "id", es.ID())

c := es.Count()
c := es.Len()
assert.Equal(t, 0, c)

es.Record(dummyRecord)
c = es.Count()
c = es.Len()
assert.Equal(t, 1, c)
}

Expand All @@ -123,31 +123,31 @@ func TestEvalState_Logs(t *testing.T) {
}

func TestEvalCache_GetOrCreate(t *testing.T) {
ec := NewEvalCache()
ec := EvalStore{}
id := "foo"
es, ok := ec.Get(id)
es, ok := ec.Load(id)
assert.False(t, ok)
assert.Empty(t, es)

es = ec.GetOrCreate(id, nil)
es = ec.LoadOrStore(id, nil)
assert.Equal(t, id, es.ID())

es, ok = ec.Get(id)
es, ok = ec.Load(id)
assert.True(t, ok)
assert.Equal(t, id, es.ID())
}

func TestEvalCache_Invalidate(t *testing.T) {
ec := NewEvalCache()
ec := EvalStore{}
id := "completedId"

ec.Put(NewEvalState(id, nil))
es, ok := ec.Get(id)
ec.Store(NewEvalState(id, nil))
es, ok := ec.Load(id)
assert.True(t, ok)
assert.Equal(t, id, es.ID())

ec.Del(id)
es, ok = ec.Get(id)
ec.Delete(id)
es, ok = ec.Load(id)
assert.False(t, ok)
assert.Empty(t, es)
}
6 changes: 5 additions & 1 deletion pkg/controller/expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,12 @@ func (oe *JavascriptExpressionParser) resolveExpr(rootScope interface{}, current

go func() {
<-time.After(ResolvingTimeout)
scoped.Interrupt <- func() {
select {
case scoped.Interrupt <- func() {
panic(ErrTimeOut)
}:
default:
// evaluation has already been interrupted / quit
}
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/invocation/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (a *ActonAbort) Eval(cec controller.EvalContext) controller.Action {
}

func (a *ActonAbort) Apply() error {
wfiLog.Info("Applying action: abort")
log.Info("Applying action: abort")
return a.API.Cancel(a.InvocationID)
}

Expand All @@ -61,7 +61,7 @@ func (a *ActionFail) Eval(cec controller.EvalContext) controller.Action {
}

func (a *ActionFail) Apply() error {
wfiLog.Infof("Applying action: fail (%v)", a.Err)
log.Infof("Applying action: fail (%v)", a.Err)
return a.API.Fail(a.InvocationID, a.Err)
}

Expand Down
Loading

0 comments on commit 397fea2

Please sign in to comment.