Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use priority queue instead of queue #167

Merged
merged 8 commits into from
Aug 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/controller/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/fission/fission-workflows/pkg/fes"
)

// TODO remove from EvalCache actions
// TODO remove from EvalStore actions

type ActionWait struct {
EvalState *EvalState
Expand All @@ -35,12 +35,12 @@ func (a *ActionSkip) Eval(rule EvalContext) Action {
}

type ActionRemoveFromEvalCache struct {
EvalCache *EvalCache
EvalCache *EvalStore
ID string
}

func (a *ActionRemoveFromEvalCache) Apply() error {
a.EvalCache.Del(a.ID)
a.EvalCache.Delete(a.ID)
return nil
}

Expand Down
202 changes: 160 additions & 42 deletions pkg/controller/evaluation.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package controller

import (
"container/heap"
"fmt"
"strings"
"sync"
"time"
Expand All @@ -11,65 +13,52 @@ import (
"github.com/sirupsen/logrus"
)

// EvalCache allows storing and retrieving EvalStates in a thread-safe way.
type EvalCache struct {
states map[string]*EvalState
lock sync.RWMutex
const (
DefaultPriority = 0
)

// EvalStore allows storing and retrieving EvalStates in a thread-safe way.
type EvalStore struct {
mp sync.Map
}

func NewEvalCache() *EvalCache {
return &EvalCache{
states: map[string]*EvalState{},
}
func (e *EvalStore) LoadOrStore(id string, spanCtx opentracing.SpanContext) *EvalState {
s, _ := e.mp.LoadOrStore(id, NewEvalState(id, spanCtx))
return s.(*EvalState)
}

func (e *EvalCache) GetOrCreate(id string, spanCtx opentracing.SpanContext) *EvalState {
s, ok := e.Get(id)
func (e *EvalStore) Load(id string) (*EvalState, bool) {
s, ok := e.mp.Load(id)
if !ok {
s = NewEvalState(id, spanCtx)
e.Put(s)
return nil, false
}
return s
return s.(*EvalState), true
}

func (e *EvalCache) Get(id string) (*EvalState, bool) {
e.lock.RLock()
s, ok := e.states[id]
e.lock.RUnlock()
return s, ok
func (e *EvalStore) Store(state *EvalState) {
e.mp.Store(state.id, state)
}

func (e *EvalCache) Put(state *EvalState) {
e.lock.Lock()
e.states[state.id] = state
e.lock.Unlock()
func (e *EvalStore) Delete(id string) {
e.mp.Delete(id)
}

func (e *EvalCache) Del(id string) {
e.lock.Lock()
delete(e.states, id)
e.lock.Unlock()
}

func (e *EvalCache) List() map[string]*EvalState {
func (e *EvalStore) List() map[string]*EvalState {
results := map[string]*EvalState{}
e.lock.RLock()
for id, state := range e.states {
results[id] = state
}
e.lock.RUnlock()
e.mp.Range(func(k, v interface{}) bool {
results[k.(string)] = v.(*EvalState)
return true
})
return results
}

func (e *EvalCache) Close() error {
e.lock.RLock()
for _, es := range e.states {
func (e *EvalStore) Close() error {
for _, es := range e.List() {
err := es.Close()
if err != nil {
logrus.Errorf("Failed to close evaluation state: %v", err)
}
}
e.lock.RUnlock()
return nil
}

Expand Down Expand Up @@ -162,10 +151,13 @@ func (e *EvalState) Free() {
}

func (e *EvalState) ID() string {
if e == nil {
return ""
}
return e.id
}

func (e *EvalState) Count() int {
func (e *EvalState) Len() int {
e.dataLock.RLock()
defer e.dataLock.RUnlock()
return len(e.log)
Expand Down Expand Up @@ -234,19 +226,19 @@ func NewEvalRecord() EvalRecord {
// EvalLog is a time-ordered log of evaluation records. Newer records are appended to the end of the log.
type EvalLog []EvalRecord

func (e EvalLog) Count() int {
func (e EvalLog) Len() int {
return len(e)
}

func (e EvalLog) Last() (EvalRecord, bool) {
if e.Count() == 0 {
if e.Len() == 0 {
return EvalRecord{}, false
}
return e[len(e)-1], true
}

func (e EvalLog) First() (EvalRecord, bool) {
if e.Count() == 0 {
if e.Len() == 0 {
return EvalRecord{}, false
}
return e[0], true
Expand All @@ -255,3 +247,129 @@ func (e EvalLog) First() (EvalRecord, bool) {
func (e *EvalLog) Record(record EvalRecord) {
*e = append(*e, record)
}

type HeapItem struct {
*EvalState
Priority int
index int
}

func (h *HeapItem) GetEvalState() *EvalState {
if h == nil {
return nil
}
return h.EvalState
}

type EvalStateHeap struct {
heap []*HeapItem
items map[string]*HeapItem
unique bool
}

func NewEvalStateHeap(unique bool) *EvalStateHeap {
h := &EvalStateHeap{
items: map[string]*HeapItem{},
unique: unique,
}
heap.Init(h)
return h
}

func (h EvalStateHeap) Len() int {
return len(h.heap)
}

func (h EvalStateHeap) Front() *HeapItem {
if h.Len() == 0 {
return nil
}
return h.heap[0]
}

func (h EvalStateHeap) Less(i, j int) bool {
it := h.heap[i]
jt := h.heap[j]

// Check priorities (descending)
if it.Priority > jt.Priority {
return true
} else if it.Priority < jt.Priority {
return false
}

// If priorities are equal, compare timestamp (ascending)
return ignoreOk(it.Last()).Timestamp.Before(ignoreOk(jt.Last()).Timestamp)
}

func (h EvalStateHeap) Swap(i, j int) {
h.heap[i], h.heap[j] = h.heap[j], h.heap[i]
h.heap[i].index = i
h.heap[j].index = j
}

// Do not use directly, use heap.Push!
// The signature of push requests an interface{} to adhere to the sort.Interface interface, but will panic if a
// a type other than *EvalState is provided.
func (h *EvalStateHeap) Push(x interface{}) {
switch t := x.(type) {
case *EvalState:
h.pushPriority(t, DefaultPriority)
case *HeapItem:
h.pushPriority(t.EvalState, t.Priority)
default:
panic(fmt.Sprintf("invalid entity submitted: %v", t))
}
}

func (h *EvalStateHeap) pushPriority(state *EvalState, priority int) {
if h.unique {
if _, ok := h.items[state.id]; ok {
h.UpdatePriority(state, priority)
return
}
}
el := &HeapItem{
EvalState: state,
Priority: priority,
index: h.Len(),
}
h.heap = append(h.heap, el)
h.items[state.id] = el
}

// Do not use directly, use heap.Pop!
func (h *EvalStateHeap) Pop() interface{} {
popped := h.heap[h.Len()-1]
delete(h.items, popped.id)
h.heap = h.heap[:h.Len()-1]
return popped.EvalState
}

func (h *EvalStateHeap) Update(es *EvalState) *HeapItem {
if existing, ok := h.items[es.id]; ok {
existing.EvalState = es
heap.Fix(h, existing.index)
return existing
}
return nil
}

func (h *EvalStateHeap) UpdatePriority(es *EvalState, priority int) *HeapItem {
if existing, ok := h.items[es.id]; ok {
existing.Priority = priority
existing.EvalState = es
heap.Fix(h, existing.index)
return existing
}
return nil
}

func (h *EvalStateHeap) Get(key string) *HeapItem {
item, _ := h.items[key]
return item
}

func ignoreOk(r EvalRecord, _ bool) EvalRecord {
return r
}
24 changes: 12 additions & 12 deletions pkg/controller/evaluation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestEvalLog_Append(t *testing.T) {
}
log.Record(record2)

assert.Equal(t, 2, log.Count())
assert.Equal(t, 2, log.Len())
last, ok := log.Last()
assert.True(t, ok)
assert.EqualValues(t, record2, last)
Expand Down Expand Up @@ -100,11 +100,11 @@ func TestEvalState_Count(t *testing.T) {
es := NewEvalState("id", nil)
assert.Equal(t, "id", es.ID())

c := es.Count()
c := es.Len()
assert.Equal(t, 0, c)

es.Record(dummyRecord)
c = es.Count()
c = es.Len()
assert.Equal(t, 1, c)
}

Expand All @@ -123,31 +123,31 @@ func TestEvalState_Logs(t *testing.T) {
}

func TestEvalCache_GetOrCreate(t *testing.T) {
ec := NewEvalCache()
ec := EvalStore{}
id := "foo"
es, ok := ec.Get(id)
es, ok := ec.Load(id)
assert.False(t, ok)
assert.Empty(t, es)

es = ec.GetOrCreate(id, nil)
es = ec.LoadOrStore(id, nil)
assert.Equal(t, id, es.ID())

es, ok = ec.Get(id)
es, ok = ec.Load(id)
assert.True(t, ok)
assert.Equal(t, id, es.ID())
}

func TestEvalCache_Invalidate(t *testing.T) {
ec := NewEvalCache()
ec := EvalStore{}
id := "completedId"

ec.Put(NewEvalState(id, nil))
es, ok := ec.Get(id)
ec.Store(NewEvalState(id, nil))
es, ok := ec.Load(id)
assert.True(t, ok)
assert.Equal(t, id, es.ID())

ec.Del(id)
es, ok = ec.Get(id)
ec.Delete(id)
es, ok = ec.Load(id)
assert.False(t, ok)
assert.Empty(t, es)
}
6 changes: 5 additions & 1 deletion pkg/controller/expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,12 @@ func (oe *JavascriptExpressionParser) resolveExpr(rootScope interface{}, current

go func() {
<-time.After(ResolvingTimeout)
scoped.Interrupt <- func() {
select {
case scoped.Interrupt <- func() {
panic(ErrTimeOut)
}:
default:
// evaluation has already been interrupted / quit
}
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/invocation/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (a *ActonAbort) Eval(cec controller.EvalContext) controller.Action {
}

func (a *ActonAbort) Apply() error {
wfiLog.Info("Applying action: abort")
log.Info("Applying action: abort")
return a.API.Cancel(a.InvocationID)
}

Expand All @@ -61,7 +61,7 @@ func (a *ActionFail) Eval(cec controller.EvalContext) controller.Action {
}

func (a *ActionFail) Apply() error {
wfiLog.Infof("Applying action: fail (%v)", a.Err)
log.Infof("Applying action: fail (%v)", a.Err)
return a.API.Fail(a.InvocationID, a.Err)
}

Expand Down
Loading