Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mlayer/skiplist #1214

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,16 +302,22 @@ func (l *List) AddMutationWithIndex(ctx context.Context, t *protos.DirectedEdge)
val, found = l.findValue(math.MaxUint64)
}
}
countBefore := l.length(0)
var countBefore, countAfter int
doCount := schema.State().HasCount(t.Attr)
if doCount {
countBefore = l.length(0)
}
_, err := l.addMutation(ctx, t)
countAfter := l.length(0)
if doCount {
countAfter = l.length(0)
}
l.Unlock()

if err != nil {
return err
}
x.PredicateStats.Add(t.Attr, 1)
if countAfter != countBefore && schema.State().HasCount(t.Attr) {
if doCount && countAfter != countBefore {
if err := updateCount(ctx, countParams{
attr: t.Attr,
countBefore: countBefore,
Expand Down
110 changes: 58 additions & 52 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"unsafe"

"github.com/dgraph-io/badger"
"github.com/ryszard/goskiplist/skiplist"
"golang.org/x/net/trace"

"github.com/dgryski/go-farm"
Expand Down Expand Up @@ -69,7 +70,8 @@ type List struct {
key []byte
ghash uint64
plist *protos.PostingList
mlayer []*protos.Posting // mutations
mlayer *skiplist.SkipList // mutation layer
plen int
lastCompact time.Time
deleteMe int32 // Using atomic for this, to avoid expensive SetForDeletion operation.
refcount int32
Expand Down Expand Up @@ -110,7 +112,7 @@ func (l *List) calculateSize() uint32 {
sz := int(unsafe.Sizeof(l))
sz += l.plist.Size()
sz += cap(l.key)
sz += cap(l.mlayer) * 8
sz += l.mlayer.Len() * 8
sz += cap(l.pending) * 8
return uint32(sz)
}
Expand Down Expand Up @@ -195,12 +197,19 @@ func (it *PIterator) Posting() *protos.Posting {
return it.uidPosting
}

func getNewSL() *skiplist.SkipList {
return skiplist.NewCustomMap(func(l, r interface{}) bool {
return l.(uint64) < r.(uint64)
})
}

func getNew(key []byte, pstore *badger.KV) *List {
l := listPool.Get().(*List)
*l = List{}
l.key = key
l.ghash = farm.Fingerprint64(key)
l.refcount = 1
l.mlayer = getNewSL()

l.Lock()
defer l.Unlock()
Expand All @@ -224,6 +233,7 @@ func getNew(key []byte, pstore *badger.KV) *List {
if val != nil {
x.Checkf(l.plist.Unmarshal(val), "Unable to Unmarshal PostingList from store")
}
l.plen = len(l.plist.Uids) / 8
atomic.StoreUint32(&l.estimatedSize, l.calculateSize())
return l
}
Expand Down Expand Up @@ -323,15 +333,11 @@ func (l *List) updateMutationLayer(mpost *protos.Posting) bool {
x.AssertTrue(mpost.Op == Set || mpost.Op == Del)

// First check the mutable layer.
midx := sort.Search(len(l.mlayer), func(idx int) bool {
mp := l.mlayer[idx]
return mpost.Uid <= mp.Uid
})
oldP, ok := l.mlayer.Get(mpost.Uid)

// This block handles the case where mpost.UID is found in mutation layer.
if midx < len(l.mlayer) && l.mlayer[midx].Uid == mpost.Uid {
// mp is the posting found in mlayer.
oldPost := l.mlayer[midx]
if ok {
oldPost := oldP.(*protos.Posting)

// Note that mpost.Op is either Set or Del, whereas oldPost.Op can be
// either Set or Del or Add.
Expand Down Expand Up @@ -359,15 +365,22 @@ func (l *List) updateMutationLayer(mpost *protos.Posting) bool {
if oldPost.Op == Add {
if mpost.Op == Del {
// Undo old post.
copy(l.mlayer[midx:], l.mlayer[midx+1:])
l.mlayer[len(l.mlayer)-1] = nil
l.mlayer = l.mlayer[:len(l.mlayer)-1]
l.plen--
l.mlayer.Delete(mpost.Uid)
return true
}
// Add followed by Set is considered an Add. Hence, mutate mpost.Op.
mpost.Op = Add
} else if oldPost.Op == Del {
if mpost.Op == Set {
l.plen++
}
} else {
if mpost.Op == Del {
l.plen--
}
}
l.mlayer[midx] = mpost
l.mlayer.Set(mpost.Uid, mpost)
return true
}

Expand All @@ -388,24 +401,19 @@ func (l *List) updateMutationLayer(mpost *protos.Posting) bool {
}
if !uidFound {
// Posting not found in PL. This is considered an Add operation.
l.plen++
mpost.Op = Add
}
} else if !psame { // mpost.Op==Del
// Either we fail to find UID in immutable PL or contents don't match.
return false
}

// Doesn't match what we already have in immutable layer. So, add to mutable layer.
if midx >= len(l.mlayer) {
// Add it at the end.
l.mlayer = append(l.mlayer, mpost)
return true
} else {
if psame {
l.plen--
} else {
// Either we fail to find UID in immutable PL or contents don't match.
return false
}
}

// Otherwise, add it where midx is pointing to.
l.mlayer = append(l.mlayer, nil)
copy(l.mlayer[midx+1:], l.mlayer[midx:])
l.mlayer[midx] = mpost
l.mlayer.Set(mpost.Uid, mpost)
return true
}

Expand Down Expand Up @@ -503,7 +511,7 @@ func (l *List) addMutation(ctx context.Context, t *protos.DirectedEdge) (bool, e
hasMutated := l.updateMutationLayer(mpost)
if dur := time.Since(t1); dur > time.Millisecond {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("updated mutation layer %v %v %v", dur, len(l.mlayer), len(l.plist.Postings))
tr.LazyPrintf("updated mutation layer %v %v %v", dur, l.mlayer.Len(), len(l.plist.Postings))
}
}

Expand Down Expand Up @@ -536,7 +544,8 @@ func (l *List) delete(ctx context.Context, attr string) error {
postingListPool.Put(l.plist)
}
l.plist = emptyList
l.mlayer = l.mlayer[:0] // Clear the mutation layer.
l.mlayer = getNewSL() // Clear the mutation layer.
l.plen = 0
atomic.StoreInt32(&l.deleteAll, 1)

var gid uint32
Expand Down Expand Up @@ -574,29 +583,21 @@ func (l *List) Iterate(afterUid uint64, f func(obj *protos.Posting) bool) {

func (l *List) iterate(afterUid uint64, f func(obj *protos.Posting) bool) {
l.AssertRLock()
midx := 0

mlayerLen := len(l.mlayer)
if afterUid > 0 {
midx = sort.Search(mlayerLen, func(idx int) bool {
mp := l.mlayer[idx]
return afterUid < mp.Uid
})
}

var mp, pp *protos.Posting
cont := true
var pitr PIterator
pitr.Init(l.plist, afterUid)
mitr := l.mlayer.Iterator()
mok := mitr.Seek(afterUid + 1)
for cont {
if pitr.Valid() {
pp = pitr.Posting()
} else {
pp = emptyPosting
}

if midx < mlayerLen {
mp = l.mlayer[midx]
if mok {
mp = mitr.Value().(*protos.Posting)
} else {
mp = emptyPosting
}
Expand All @@ -611,13 +612,13 @@ func (l *List) iterate(afterUid uint64, f func(obj *protos.Posting) bool) {
if mp.Op != Del {
cont = f(mp)
}
midx++
mok = mitr.Next()
case pp.Uid == mp.Uid:
if mp.Op != Del {
cont = f(mp)
}
pitr.Next()
midx++
mok = mitr.Next()
default:
log.Fatalf("Unhandled case during iteration of posting list.")
}
Expand All @@ -626,25 +627,28 @@ func (l *List) iterate(afterUid uint64, f func(obj *protos.Posting) bool) {

func (l *List) length(afterUid uint64) int {
l.AssertRLock()
if afterUid == 0 {
return l.plen
}

uidx, midx := 0, 0
uidx := 0
pl := l.plist

if afterUid > 0 {
uidx = findUidIndex(pl, afterUid)
midx = sort.Search(len(l.mlayer), func(idx int) bool {
mp := l.mlayer[idx]
return afterUid < mp.Uid
})
}

count := len(pl.Uids)/8 - uidx
for _, p := range l.mlayer[midx:] {
mitr := l.mlayer.Iterator()
mok := mitr.Seek(afterUid + 1)
for mok {
p := mitr.Value().(*protos.Posting)
if p.Op == Add {
count++
} else if p.Op == Del {
count--
}
mok = mitr.Next()
}
return count
}
Expand All @@ -670,7 +674,7 @@ func (l *List) SyncIfDirty(delFromCache bool) (committed bool, err error) {

// deleteAll is used to differentiate when we don't have any updates, v/s
// when we have explicitly deleted everything.
if len(l.mlayer) == 0 && atomic.LoadInt32(&l.deleteAll) == 0 {
if l.mlayer.Len() == 0 && atomic.LoadInt32(&l.deleteAll) == 0 {
l.water.Ch <- x.Mark{Indices: l.pending, Done: true}
l.pending = make([]uint64, 0, 3)
return false, nil
Expand Down Expand Up @@ -707,6 +711,8 @@ func (l *List) SyncIfDirty(delFromCache bool) (committed bool, err error) {
}
return true
})
final.Uids = final.Uids[:8*count]
l.plen = count

var data []byte
if len(final.Uids) == 0 {
Expand Down Expand Up @@ -766,7 +772,7 @@ func (l *List) SyncIfDirty(delFromCache bool) (committed bool, err error) {
doAsyncWrite(l.key, data, f)
// Now reset the mutation variables.
l.pending = make([]uint64, 0, 3)
l.mlayer = l.mlayer[:0]
l.mlayer = getNewSL()
l.lastCompact = time.Now()
atomic.StoreInt32(&l.deleteAll, 0) // Unset deleteAll
return true, nil
Expand All @@ -783,7 +789,7 @@ func (l *List) LastCompactionTs() time.Time {
func (l *List) Uids(opt ListOptions) *protos.List {
// Pre-assign length to make it faster.
l.RLock()
res := make([]uint64, 0, l.length(opt.AfterUID))
res := make([]uint64, 0, l.length(0))
l.iterate(opt.AfterUID, func(p *protos.Posting) bool {
if postingType(p) == x.ValueUid {
res = append(res, p.Uid)
Expand Down
5 changes: 3 additions & 2 deletions posting/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (

func getPosting() *List {
l := &List{
plist: &protos.PostingList{},
water: marks.Get(1),
plist: &protos.PostingList{},
water: marks.Get(1),
mlayer: getNewSL(),
}
l.incr()
return l
Expand Down