diff --git a/posting/list.go b/posting/list.go index 90b73ea44d8..ba8f47adf9e 100644 --- a/posting/list.go +++ b/posting/list.go @@ -74,8 +74,10 @@ type List struct { plist *pb.PostingList mutationMap map[uint64]*pb.PostingList minTs uint64 // commit timestamp of immutable layer, reject reads before this ts. - deleteMe int32 // Using atomic for this, to avoid expensive SetForDeletion operation. estimatedSize int32 + + pendingTxns int32 // Using atomic for this, to avoid locking in SetForDeletion operation. + deleteMe int32 // Using atomic for this, to avoid expensive SetForDeletion operation. } // calculateSize would give you the size estimate. This is expensive, so run it carefully. @@ -223,17 +225,12 @@ func (l *List) EstimatedSize() int32 { } // SetForDeletion will mark this List to be deleted, so no more mutations can be applied to this. +// Ensure that we don't acquire any locks during a call to this function, so the LRU cache can +// proceed smoothly. func (l *List) SetForDeletion() bool { - if l.AlreadyLocked() { + if atomic.LoadInt32(&l.pendingTxns) > 0 { return false } - l.RLock() - defer l.RUnlock() - for _, plist := range l.mutationMap { - if plist.CommitTs == 0 { - return false - } - } atomic.StoreInt32(&l.deleteMe, 1) return true } @@ -368,6 +365,7 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) er l.updateMutationLayer(mpost) atomic.AddInt32(&l.estimatedSize, int32(mpost.Size()+16 /* various overhead */)) + atomic.AddInt32(&l.pendingTxns, 1) txn.AddKeys(string(l.key), conflictKey) return nil } @@ -394,8 +392,18 @@ func (l *List) commitMutation(startTs, commitTs uint64) error { if atomic.LoadInt32(&l.deleteMe) == 1 { return ErrRetry } - l.AssertLock() + + // Check if we still have a pending txn when we return from this function. + defer func() { + for _, plist := range l.mutationMap { + if plist.CommitTs == 0 { + return // Got a pending txn. + } + } + atomic.StoreInt32(&l.pendingTxns, 0) + }() + plist, ok := l.mutationMap[startTs] if !ok { // It was already committed, might be happening due to replay. diff --git a/posting/lists.go b/posting/lists.go index 292d1882720..18741e5b2b7 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -25,7 +25,6 @@ import ( "runtime" "strconv" "strings" - "sync/atomic" "time" "golang.org/x/net/trace" @@ -225,10 +224,6 @@ func Cleanup() { closer.SignalAndWait() } -func StopLRUEviction() { - atomic.StoreInt32(&lcache.done, 1) -} - // Get stores the List corresponding to key, if it's not there already. // to lru cache and returns it. // diff --git a/posting/lru.go b/posting/lru.go index 8c397d823a1..3e1eaaff702 100644 --- a/posting/lru.go +++ b/posting/lru.go @@ -23,7 +23,6 @@ import ( "container/list" "context" "sync" - "sync/atomic" "time" "github.com/dgraph-io/dgraph/x" @@ -41,7 +40,6 @@ type listCache struct { evicts uint64 ll *list.List cache map[string]*list.Element - done int32 } type CacheStats struct { @@ -113,17 +111,17 @@ func (c *listCache) removeOldestLoop() { defer ticker.Stop() for range ticker.C { c.removeOldest() - if atomic.LoadInt32(&c.done) > 0 { - return - } } } func (c *listCache) removeOldest() { c.Lock() defer c.Unlock() + + // Only allow evictions for 10ms out of a second. + deadline := time.Now().Add(10 * time.Millisecond) ele := c.ll.Back() - for c.curSize > c.MaxSize && atomic.LoadInt32(&c.done) == 0 { + for c.curSize > c.MaxSize && time.Now().Before(deadline) { if ele == nil { if c.curSize < 0 { c.curSize = 0 diff --git a/worker/worker.go b/worker/worker.go index dfbfaf441c8..7a10ccc8b41 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -27,7 +27,6 @@ import ( "github.com/dgraph-io/badger" "github.com/dgraph-io/dgraph/conn" - "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -99,7 +98,4 @@ func BlockingStop() { glog.Infof("Stopping worker server...") workerServer.Stop() - - // TODO: What is this for? - posting.StopLRUEviction() }