Skip to content

Commit

Permalink
Make query after mutation before commit work. Make tests work.
Browse files Browse the repository at this point in the history
  • Loading branch information
manishrjain committed Dec 20, 2018
1 parent 4f9ad6f commit 4451b80
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 774 deletions.
682 changes: 0 additions & 682 deletions posting/btree.go

This file was deleted.

31 changes: 2 additions & 29 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,23 +444,12 @@ func DeleteCountIndex(attr string) error {
type rebuild struct {
prefix []byte
startTs uint64
cache map[string]*List

// The posting list passed here is the on disk version. It is not coming
// from the LRU cache.
fn func(uid uint64, pl *List, txn *Txn) error
}

// storeList would store the list in the cache.
func (r *rebuild) storeList(list *List) bool {
key := string(list.key)
if _, ok := r.cache[key]; ok {
return false
}
r.cache[key] = list
return true
}

func (r *rebuild) Run(ctx context.Context) error {
t := pstore.NewTransactionAt(r.startTs, false)
defer t.Discard()
Expand All @@ -477,23 +466,7 @@ func (r *rebuild) Run(ctx context.Context) error {
// localized posting list cache, to avoid stressing or mixing up with the
// global lcache (the LRU cache).
txn := &Txn{StartTs: r.startTs}
r.cache = make(map[string]*List)
var numGets uint64
txn.getList = func(key []byte) (*List, error) {
numGets++
if glog.V(2) && numGets%1000 == 0 {
glog.Infof("During rebuild, getList hit %d times\n", numGets)
}
if pl, ok := r.cache[string(key)]; ok {
return pl, nil
}
pl, err := getNew(key, pstore)
if err != nil {
return nil, err
}
r.cache[string(key)] = pl
return pl, nil
}
txn.UseLocalCache()

var prevKey []byte
for it.Rewind(); it.Valid(); {
Expand Down Expand Up @@ -715,7 +688,7 @@ func RebuildListType(ctx context.Context, attr string, startTs uint64) error {

// Ensure that list is in the cache run by txn. Otherwise, nothing would
// get updated.
x.AssertTrue(builder.storeList(pl))
txn.cache.Set(string(pl.key), pl)
if err := pl.AddMutation(ctx, txn, t); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,
x.Fatalf("Unhandled op: %v", op)
}
txn := Oracle().RegisterStartTs(startTs)
txn.cache.Set(string(l.key), l)
if index {
require.NoError(t, l.AddMutationWithIndex(context.Background(), edge, txn))
} else {
Expand Down Expand Up @@ -158,7 +159,6 @@ func TestTokensTable(t *testing.T) {
key := x.DataKey("name", 1)
l, err := getNew(key, ps)
require.NoError(t, err)
lcache.PutIfMissing(string(l.key), l)

edge := &pb.DirectedEdge{
Value: []byte("david"),
Expand Down
10 changes: 3 additions & 7 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ func addMutationHelper(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,
func TestAddMutation(t *testing.T) {
key := x.DataKey("name", 2)

l, err := Get(key)
txn := &Txn{StartTs: uint64(1)}
txn.UseLocalCache()
l, err := txn.Get(key)
require.NoError(t, err)

txn := &Txn{StartTs: uint64(1)}
edge := &pb.DirectedEdge{
ValueId: 9,
Label: "testing",
Expand Down Expand Up @@ -118,11 +119,6 @@ func TestAddMutation(t *testing.T) {
p = getFirst(l, 3)
require.NotNil(t, p, "Unable to retrieve posting")
require.EqualValues(t, "anti-testing", p.Label)

// Try reading the same data in another PostingList.
dl, err := Get(key)
require.NoError(t, err)
checkUids(t, dl, uids, 3)
}

func getFirst(l *List, readTs uint64) (res pb.Posting) {
Expand Down
16 changes: 12 additions & 4 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,25 +177,33 @@ type LocalCache struct {
plists map[string]*List
}

func NewLocalCache() *LocalCache {
return &LocalCache{plists: make(map[string]*List)}
}

func (lc *LocalCache) getNoStore(key string) *List {
lc.RLock()
defer lc.RUnlock()
l := lc.plists[key]
return l
if l, ok := lc.plists[key]; ok {
return l
}
return nil
}

func (lc *LocalCache) Set(key string, updated *List) *List {
lc.Lock()
defer lc.Unlock()
pl, ok := lc.plists[key]
if ok {
if pl, ok := lc.plists[key]; ok {
return pl
}
lc.plists[key] = updated
return updated
}

func (lc *LocalCache) Get(key string) (*List, error) {
if lc == nil {
return getNew([]byte(key), pstore)
}
if pl := lc.getNoStore(key); pl != nil {
return pl, nil
}
Expand Down
34 changes: 19 additions & 15 deletions posting/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
)

var o *oracle
Expand Down Expand Up @@ -60,30 +61,22 @@ type Txn struct {
// determine unhealthy, stale txns.
lastUpdate time.Time

// getList can be set for a txn, to isolate the retrieval and storage of
// posting lists in a separate cache. If nil, global LRU cache is used.
getList func(key []byte) (*List, error)

cache *LocalCache
}

func (txn *Txn) UseLocalCache() {
if txn.cache != nil {
return
}
txn.cache = &LocalCache{
plists: make(map[string]*List),
}
txn.getList = func(key []byte) (*List, error) {
return txn.cache.Get(string(key))
glog.Fatalf("Cache is already initiated.")
}
txn.cache = NewLocalCache()
}

func (txn *Txn) Get(key []byte) (*List, error) {
if txn.getList == nil {
return Get(key)
}
return txn.getList(key)
return txn.cache.Get(string(key))
}

func (txn *Txn) Store(pl *List) {
txn.cache.Set(string(pl.key), pl)
}

type oracle struct {
Expand Down Expand Up @@ -117,11 +110,22 @@ func (o *oracle) RegisterStartTs(ts uint64) *Txn {
txn.lastUpdate = time.Now()
} else {
txn = &Txn{StartTs: ts, lastUpdate: time.Now()}
txn.UseLocalCache()
o.pendingTxns[ts] = txn
}
return txn
}

func (o *oracle) CacheAt(ts uint64) *LocalCache {
o.RLock()
defer o.RUnlock()
txn, ok := o.pendingTxns[ts]
if !ok {
return nil
}
return txn.cache
}

// MinPendingStartTs returns the min start ts which is currently pending a commit or abort decision.
func (o *oracle) MinPendingStartTs() uint64 {
o.RLock()
Expand Down
4 changes: 2 additions & 2 deletions query/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ func addEdge(t *testing.T, attr string, src uint64, edge *pb.DirectedEdge) {
ValueType: edge.ValueType,
})
}
l, err := posting.Get(x.DataKey(attr, src))
require.NoError(t, err)
startTs := timestamp()
txn := posting.Oracle().RegisterStartTs(startTs)
l, err := txn.Get(x.DataKey(attr, src))
require.NoError(t, err)
require.NoError(t,
l.AddMutationWithIndex(context.Background(), edge, txn))

Expand Down
1 change: 0 additions & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) error
span.Annotatef(nil, "Txn %d should abort.", m.StartTs)
return dy.ErrConflict
}
txn.UseLocalCache()

span.Annotatef(nil, "To apply: %d edges", len(m.Edges))
var retries int
Expand Down
1 change: 1 addition & 0 deletions worker/predicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func commitTs(startTs uint64) uint64 {
func commitTransaction(t *testing.T, edge *pb.DirectedEdge, l *posting.List) {
startTs := timestamp()
txn := posting.Oracle().RegisterStartTs(startTs)
txn.Store(l)
err := l.AddMutationWithIndex(context.Background(), edge, txn)
require.NoError(t, err)

Expand Down
Loading

0 comments on commit 4451b80

Please sign in to comment.