From e2bcfdad058148395b082e093d1fe63846f1132f Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 9 Nov 2018 11:07:54 -0800 Subject: [PATCH] Posting List Evictions - Avoid acquiring a read lock on posting.List during LRU cache eviction, by using atomics pendingTxns field, which gets incremented for every mutation, and potentially reset to 0 after a commit. - Only run evictions for 10ms out of a second. - Remove done field from LRU cache. We don't need to block on it during server shutdown anymore. Previously, we did because the PLs were being written to disk during evictions, but that's no longer the case. --- posting/list.go | 28 ++++++++++++++++++---------- posting/lists.go | 5 ----- posting/lru.go | 10 ++++------ worker/worker.go | 4 ---- 4 files changed, 22 insertions(+), 25 deletions(-) diff --git a/posting/list.go b/posting/list.go index 90b73ea44d8..ba8f47adf9e 100644 --- a/posting/list.go +++ b/posting/list.go @@ -74,8 +74,10 @@ type List struct { plist *pb.PostingList mutationMap map[uint64]*pb.PostingList minTs uint64 // commit timestamp of immutable layer, reject reads before this ts. - deleteMe int32 // Using atomic for this, to avoid expensive SetForDeletion operation. estimatedSize int32 + + pendingTxns int32 // Using atomic for this, to avoid locking in SetForDeletion operation. + deleteMe int32 // Using atomic for this, to avoid expensive SetForDeletion operation. } // calculateSize would give you the size estimate. This is expensive, so run it carefully. @@ -223,17 +225,12 @@ func (l *List) EstimatedSize() int32 { } // SetForDeletion will mark this List to be deleted, so no more mutations can be applied to this. +// Ensure that we don't acquire any locks during a call to this function, so the LRU cache can +// proceed smoothly. func (l *List) SetForDeletion() bool { - if l.AlreadyLocked() { + if atomic.LoadInt32(&l.pendingTxns) > 0 { return false } - l.RLock() - defer l.RUnlock() - for _, plist := range l.mutationMap { - if plist.CommitTs == 0 { - return false - } - } atomic.StoreInt32(&l.deleteMe, 1) return true } @@ -368,6 +365,7 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) er l.updateMutationLayer(mpost) atomic.AddInt32(&l.estimatedSize, int32(mpost.Size()+16 /* various overhead */)) + atomic.AddInt32(&l.pendingTxns, 1) txn.AddKeys(string(l.key), conflictKey) return nil } @@ -394,8 +392,18 @@ func (l *List) commitMutation(startTs, commitTs uint64) error { if atomic.LoadInt32(&l.deleteMe) == 1 { return ErrRetry } - l.AssertLock() + + // Check if we still have a pending txn when we return from this function. + defer func() { + for _, plist := range l.mutationMap { + if plist.CommitTs == 0 { + return // Got a pending txn. + } + } + atomic.StoreInt32(&l.pendingTxns, 0) + }() + plist, ok := l.mutationMap[startTs] if !ok { // It was already committed, might be happening due to replay. diff --git a/posting/lists.go b/posting/lists.go index 292d1882720..18741e5b2b7 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -25,7 +25,6 @@ import ( "runtime" "strconv" "strings" - "sync/atomic" "time" "golang.org/x/net/trace" @@ -225,10 +224,6 @@ func Cleanup() { closer.SignalAndWait() } -func StopLRUEviction() { - atomic.StoreInt32(&lcache.done, 1) -} - // Get stores the List corresponding to key, if it's not there already. // to lru cache and returns it. // diff --git a/posting/lru.go b/posting/lru.go index 8c397d823a1..3e1eaaff702 100644 --- a/posting/lru.go +++ b/posting/lru.go @@ -23,7 +23,6 @@ import ( "container/list" "context" "sync" - "sync/atomic" "time" "github.com/dgraph-io/dgraph/x" @@ -41,7 +40,6 @@ type listCache struct { evicts uint64 ll *list.List cache map[string]*list.Element - done int32 } type CacheStats struct { @@ -113,17 +111,17 @@ func (c *listCache) removeOldestLoop() { defer ticker.Stop() for range ticker.C { c.removeOldest() - if atomic.LoadInt32(&c.done) > 0 { - return - } } } func (c *listCache) removeOldest() { c.Lock() defer c.Unlock() + + // Only allow evictions for 10ms out of a second. + deadline := time.Now().Add(10 * time.Millisecond) ele := c.ll.Back() - for c.curSize > c.MaxSize && atomic.LoadInt32(&c.done) == 0 { + for c.curSize > c.MaxSize && time.Now().Before(deadline) { if ele == nil { if c.curSize < 0 { c.curSize = 0 diff --git a/worker/worker.go b/worker/worker.go index dfbfaf441c8..7a10ccc8b41 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -27,7 +27,6 @@ import ( "github.com/dgraph-io/badger" "github.com/dgraph-io/dgraph/conn" - "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -99,7 +98,4 @@ func BlockingStop() { glog.Infof("Stopping worker server...") workerServer.Stop() - - // TODO: What is this for? - posting.StopLRUEviction() }