Skip to content

Commit

Permalink
remove state heap structure
Browse files Browse the repository at this point in the history
  • Loading branch information
xiekeyang authored and erwinvaneyk committed Aug 27, 2018
1 parent 290002b commit e4103e6
Showing 1 changed file with 0 additions and 153 deletions.
153 changes: 0 additions & 153 deletions pkg/controller/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,159 +248,6 @@ func (e *EvalLog) Record(record EvalRecord) {
*e = append(*e, record)
}

// ConcurrentEvalStateHeap is a thread-safe adaption of the EvalStateHeap
type ConcurrentEvalStateHeap struct {
heap *EvalStateHeap
qLock sync.RWMutex // Note: pop is altering state, so it needs a write _lock.
fLock sync.Mutex
init sync.Once
popChan chan *EvalState
frontChan chan *EvalState
updateCond sync.Cond
closeChan chan struct{}
}

func NewConcurrentEvalStateHeap(unique bool) *ConcurrentEvalStateHeap {
h := &ConcurrentEvalStateHeap{
heap: NewEvalStateHeap(unique),
popChan: make(chan *EvalState),
frontChan: make(chan *EvalState),
closeChan: make(chan struct{}),
}
h.Init()
return h
}

func (h *ConcurrentEvalStateHeap) Init() {
h.init.Do(func() {
go func() {
var element *EvalState
for {
if element == nil {
select {
case element = <-h.frontChan:
case <-h.closeChan:
return
}

} else {
select {
case h.popChan <- element:
h.fLock.Lock() // TODO change into a less strict lock, commands will have to wait until it is done
h.qLock.Lock()
element = nil
heap.Pop(h.heap)
front := h.heap.Front()
if front != nil {
element = front.GetEvalState()
}
h.qLock.Unlock()
h.fLock.Unlock()
case element = <-h.frontChan:
case <-h.closeChan:
return
}
}
}
}()
})
}

func (h *ConcurrentEvalStateHeap) Pop() *EvalState {
h.lock()
if h.heap.Len() == 0 {
return nil
}
popped := heap.Pop(h.heap)
h.unlock()
return (popped).(*EvalState)
}

// lock write-locks the queue, flushing the channel
func (h *ConcurrentEvalStateHeap) lock() {
h.frontChan <- nil // clear the pop channel
h.fLock.Lock()
h.frontChan <- nil // clear the pop channel
h.qLock.Lock()
}

// unlock write-unlocks the queue, filling the popChan
func (h *ConcurrentEvalStateHeap) unlock() {
front := h.heap.Front()
if front != nil {
h.frontChan <- front.GetEvalState()
}
h.qLock.Unlock()
h.fLock.Unlock()
}

func (h *ConcurrentEvalStateHeap) Chan() <-chan *EvalState {
return h.popChan
}

func (h *ConcurrentEvalStateHeap) Len() int {
h.qLock.RLock()
count := h.heap.Len()
h.qLock.RUnlock()
return count
}

func (h *ConcurrentEvalStateHeap) Get(key string) *HeapItem {
h.qLock.RLock()
item := h.heap.Get(key)
h.qLock.RUnlock()
return item
}

func (h *ConcurrentEvalStateHeap) Front() *HeapItem {
h.qLock.RLock()
front := h.heap.Front()
h.qLock.RUnlock()
return front
}

func (h *ConcurrentEvalStateHeap) Update(s *EvalState) *HeapItem {
if s == nil {
return nil
}
h.lock()
updated := h.heap.Update(s)
h.unlock()
return updated
}

func (h *ConcurrentEvalStateHeap) UpdatePriority(s *EvalState, priority int) *HeapItem {
if s == nil {
return nil
}
h.lock()
updated := h.heap.UpdatePriority(s, priority)
h.unlock()
return updated
}

func (h *ConcurrentEvalStateHeap) Push(s *EvalState) {
if s == nil {
return
}
h.lock()
heap.Push(h.heap, s)
h.unlock()
}

func (h *ConcurrentEvalStateHeap) PushPriority(s *EvalState, priority int) {
if s == nil {
return
}
item := &HeapItem{
EvalState: s,
Priority: priority,
}
h.lock()
heap.Push(h.heap, item)
h.unlock()
}

type HeapItem struct {
*EvalState
Priority int
Expand Down

0 comments on commit e4103e6

Please sign in to comment.