Skip to content

Commit

Permalink
Posting List Evictions
Browse files Browse the repository at this point in the history
- Avoid acquiring a read lock on posting.List during LRU cache eviction,
by using atomics pendingTxns field, which gets incremented for every
mutation, and potentially reset to 0 after a commit.
- Only run evictions for 10ms out of a second.
- Remove done field from LRU cache. We don't need to block on it during
server shutdown anymore. Previously, we did because the PLs
were being written to disk during evictions, but that's no longer the
case.
  • Loading branch information
manishrjain committed Nov 9, 2018
1 parent b5ac4d7 commit e2bcfda
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 25 deletions.
28 changes: 18 additions & 10 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ type List struct {
plist *pb.PostingList
mutationMap map[uint64]*pb.PostingList
minTs uint64 // commit timestamp of immutable layer, reject reads before this ts.
deleteMe int32 // Using atomic for this, to avoid expensive SetForDeletion operation.
estimatedSize int32

pendingTxns int32 // Using atomic for this, to avoid locking in SetForDeletion operation.
deleteMe int32 // Using atomic for this, to avoid expensive SetForDeletion operation.
}

// calculateSize would give you the size estimate. This is expensive, so run it carefully.
Expand Down Expand Up @@ -223,17 +225,12 @@ func (l *List) EstimatedSize() int32 {
}

// SetForDeletion will mark this List to be deleted, so no more mutations can be applied to this.
// Ensure that we don't acquire any locks during a call to this function, so the LRU cache can
// proceed smoothly.
func (l *List) SetForDeletion() bool {
if l.AlreadyLocked() {
if atomic.LoadInt32(&l.pendingTxns) > 0 {
return false
}
l.RLock()
defer l.RUnlock()
for _, plist := range l.mutationMap {
if plist.CommitTs == 0 {
return false
}
}
atomic.StoreInt32(&l.deleteMe, 1)
return true
}
Expand Down Expand Up @@ -368,6 +365,7 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) er

l.updateMutationLayer(mpost)
atomic.AddInt32(&l.estimatedSize, int32(mpost.Size()+16 /* various overhead */))
atomic.AddInt32(&l.pendingTxns, 1)
txn.AddKeys(string(l.key), conflictKey)
return nil
}
Expand All @@ -394,8 +392,18 @@ func (l *List) commitMutation(startTs, commitTs uint64) error {
if atomic.LoadInt32(&l.deleteMe) == 1 {
return ErrRetry
}

l.AssertLock()

// Check if we still have a pending txn when we return from this function.
defer func() {
for _, plist := range l.mutationMap {
if plist.CommitTs == 0 {
return // Got a pending txn.
}
}
atomic.StoreInt32(&l.pendingTxns, 0)
}()

plist, ok := l.mutationMap[startTs]
if !ok {
// It was already committed, might be happening due to replay.
Expand Down
5 changes: 0 additions & 5 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"runtime"
"strconv"
"strings"
"sync/atomic"
"time"

"golang.org/x/net/trace"
Expand Down Expand Up @@ -225,10 +224,6 @@ func Cleanup() {
closer.SignalAndWait()
}

func StopLRUEviction() {
atomic.StoreInt32(&lcache.done, 1)
}

// Get stores the List corresponding to key, if it's not there already.
// to lru cache and returns it.
//
Expand Down
10 changes: 4 additions & 6 deletions posting/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"container/list"
"context"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/dgraph/x"
Expand All @@ -41,7 +40,6 @@ type listCache struct {
evicts uint64
ll *list.List
cache map[string]*list.Element
done int32
}

type CacheStats struct {
Expand Down Expand Up @@ -113,17 +111,17 @@ func (c *listCache) removeOldestLoop() {
defer ticker.Stop()
for range ticker.C {
c.removeOldest()
if atomic.LoadInt32(&c.done) > 0 {
return
}
}
}

func (c *listCache) removeOldest() {
c.Lock()
defer c.Unlock()

// Only allow evictions for 10ms out of a second.
deadline := time.Now().Add(10 * time.Millisecond)
ele := c.ll.Back()
for c.curSize > c.MaxSize && atomic.LoadInt32(&c.done) == 0 {
for c.curSize > c.MaxSize && time.Now().Before(deadline) {
if ele == nil {
if c.curSize < 0 {
c.curSize = 0
Expand Down
4 changes: 0 additions & 4 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/dgraph-io/badger"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"

Expand Down Expand Up @@ -99,7 +98,4 @@ func BlockingStop() {

glog.Infof("Stopping worker server...")
workerServer.Stop()

// TODO: What is this for?
posting.StopLRUEviction()
}

0 comments on commit e2bcfda

Please sign in to comment.