From 452a05d731672e8ff85590ac246ee81918caf13f Mon Sep 17 00:00:00 2001 From: Ashwin Ramesh Date: Wed, 12 Jul 2017 12:00:31 +1000 Subject: [PATCH 1/5] Switch key to string --- posting/list.go | 4 ++-- posting/lists.go | 9 ++++----- posting/lru.go | 16 ++++++++-------- posting/lru_test.go | 17 +++++++++-------- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/posting/list.go b/posting/list.go index 26f7b8aa123..cc68af29874 100644 --- a/posting/list.go +++ b/posting/list.go @@ -67,7 +67,7 @@ type List struct { x.SafeMutex index x.SafeMutex key []byte - ghash uint64 + ghash string plist *protos.PostingList mlayer []*protos.Posting // mutations lastCompact time.Time @@ -199,7 +199,7 @@ func getNew(key []byte, pstore *badger.KV) *List { l := listPool.Get().(*List) *l = List{} l.key = key - l.ghash = farm.Fingerprint64(key) + l.ghash = string(key) l.refcount = 1 l.Lock() diff --git a/posting/lists.go b/posting/lists.go index 3155f296b3f..19bdd283fe8 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -30,7 +30,6 @@ import ( "golang.org/x/net/trace" "github.com/dgraph-io/badger" - "github.com/dgryski/go-farm" "github.com/dgraph-io/dgraph/protos" "github.com/dgraph-io/dgraph/x" @@ -220,7 +219,7 @@ func periodicCommit() { } type fingerPrint struct { - fp uint64 + fp string gid uint32 } @@ -260,7 +259,7 @@ func Init(ps *badger.KV) { // And watermark stuff would have to be located outside worker pkg, maybe in x. // That way, we don't have a dependency conflict. func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) { - fp := farm.Fingerprint64(key) + fp := string(key) lp := lcache.Get(fp) if lp != nil { @@ -299,7 +298,7 @@ func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) { // Get takes a key and a groupID. It checks if the in-memory map has an // updated value and returns it if it exists or it gets from the store and DOES NOT ADD to lhmap. func Get(key []byte, gid uint32) (rlist *List, decr func()) { - fp := farm.Fingerprint64(key) + fp := string(key) lp := lcache.Get(fp) if lp != nil { @@ -342,7 +341,7 @@ func CommitLists(numRoutines int, group uint32) { }() } - lcache.Each(func(k uint64, l *List) { + lcache.Each(func(k string, l *List) { if l == nil { // To be safe. Check might be unnecessary. return } diff --git a/posting/lru.go b/posting/lru.go index d23d883e341..63eb97b2309 100644 --- a/posting/lru.go +++ b/posting/lru.go @@ -36,7 +36,7 @@ type listCache struct { curSize uint64 evicts uint64 ll *list.List - cache map[uint64]*list.Element + cache map[string]*list.Element } type CacheStats struct { @@ -46,7 +46,7 @@ type CacheStats struct { } type entry struct { - key uint64 + key string pl *List size uint64 } @@ -57,7 +57,7 @@ func newListCache(maxSize uint64) *listCache { ctx: context.Background(), MaxSize: maxSize, ll: list.New(), - cache: make(map[uint64]*list.Element), + cache: make(map[string]*list.Element), } } @@ -69,7 +69,7 @@ func (c *listCache) UpdateMaxSize() { // TODO: fingerprint can collide // Add adds a value to the cache. -func (c *listCache) PutIfMissing(key uint64, pl *List) (res *List) { +func (c *listCache) PutIfMissing(key string, pl *List) (res *List) { c.Lock() defer c.Unlock() @@ -117,7 +117,7 @@ func (c *listCache) removeOldest() { } // Get looks up a key's value from the cache. -func (c *listCache) Get(key uint64) (pl *List) { +func (c *listCache) Get(key string) (pl *List) { c.Lock() defer c.Unlock() @@ -144,7 +144,7 @@ func (c *listCache) Stats() CacheStats { } } -func (c *listCache) Each(f func(key uint64, val *List)) { +func (c *listCache) Each(f func(key string, val *List)) { c.Lock() defer c.Unlock() @@ -160,7 +160,7 @@ func (c *listCache) Reset() { c.Lock() defer c.Unlock() c.ll = list.New() - c.cache = make(map[uint64]*list.Element) + c.cache = make(map[string]*list.Element) c.curSize = 0 } @@ -178,7 +178,7 @@ func (c *listCache) Clear() error { } c.ll = list.New() - c.cache = make(map[uint64]*list.Element) + c.cache = make(map[string]*list.Element) c.curSize = 0 return nil } diff --git a/posting/lru_test.go b/posting/lru_test.go index bbf2630accd..9e1bdb5ab45 100644 --- a/posting/lru_test.go +++ b/posting/lru_test.go @@ -17,6 +17,7 @@ limitations under the License. package posting import ( + "fmt" "sync" "testing" @@ -39,7 +40,7 @@ func TestLCacheSize(t *testing.T) { for i := 0; i < 10; i++ { // Put a posting list of size 2 l := getPosting() - lcache.PutIfMissing(uint64(i), l) + lcache.PutIfMissing(fmt.Sprintf("%d", i), l) if i < 5 { require.Equal(t, lcache.curSize, uint64((i+1)*100)) } else { @@ -60,7 +61,7 @@ func TestLCacheSizeParallel(t *testing.T) { // Put a posting list of size 2 go func(i int) { l := getPosting() - lcache.PutIfMissing(uint64(i), l) + lcache.PutIfMissing(fmt.Sprintf("%d", i), l) wg.Done() }(i) } @@ -77,7 +78,7 @@ func TestLCacheEviction(t *testing.T) { for i := 0; i < 100; i++ { l := getPosting() // Put a posting list of size 2 - lcache.PutIfMissing(uint64(i), l) + lcache.PutIfMissing(fmt.Sprintf("%d", i), l) } require.Equal(t, lcache.curSize, uint64(5000)) @@ -85,15 +86,15 @@ func TestLCacheEviction(t *testing.T) { require.Equal(t, lcache.ll.Len(), 50) for i := 0; i < 50; i++ { - require.Nil(t, lcache.Get(uint64(i))) + require.Nil(t, lcache.Get(fmt.Sprintf("%d", i))) } } func TestLCachePutIfMissing(t *testing.T) { l := getPosting() - lcache.PutIfMissing(1, l) - require.Equal(t, l, lcache.Get(1)) + lcache.PutIfMissing("1", l) + require.Equal(t, l, lcache.Get("1")) l2 := getPosting() - lcache.PutIfMissing(1, l2) - require.Equal(t, l, lcache.Get(1)) + lcache.PutIfMissing("1", l2) + require.Equal(t, l, lcache.Get("1")) } From 97bddd77e5ec9ec96e2bda5bf4795a5a9e92e5ab Mon Sep 17 00:00:00 2001 From: Ashwin Ramesh Date: Wed, 12 Jul 2017 12:10:39 +1000 Subject: [PATCH 2/5] Update --- tok/tok.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tok/tok.go b/tok/tok.go index 699ff86d05f..202369e5ac2 100644 --- a/tok/tok.go +++ b/tok/tok.go @@ -19,6 +19,7 @@ package tok import ( "encoding/binary" + "fmt" "time" farm "github.com/dgryski/go-farm" @@ -241,6 +242,11 @@ func (t ExactTokenizer) Tokens(sv types.Val) ([]string, error) { if !ok { return nil, x.Errorf("Exact indices only supported for string types") } + if len(term) > 100 { + fmt.Printf("*****\n") + fmt.Printf("Long text for exact index. Consider switching to hash for better performance\n") + fmt.Printf("*****\n") + } return []string{encodeToken(term, t.Identifier())}, nil } func (t ExactTokenizer) Identifier() byte { return 0x2 } From 4524853cefdcf5bd41d546b64e4648a24de8e165 Mon Sep 17 00:00:00 2001 From: Ashwin Ramesh Date: Thu, 20 Jul 2017 18:47:30 +1000 Subject: [PATCH 3/5] Update --- posting/list.go | 4 ++-- posting/lists.go | 26 +++++++++++--------------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/posting/list.go b/posting/list.go index cc68af29874..e4f31c7c450 100644 --- a/posting/list.go +++ b/posting/list.go @@ -519,7 +519,7 @@ func (l *List) addMutation(ctx context.Context, t *protos.DirectedEdge) (bool, e gid = group.BelongsTo(t.Attr) } if dirtyChan != nil { - dirtyChan <- fingerPrint{fp: l.ghash, gid: gid} + dirtyChan <- l.key } } return hasMutated, nil @@ -547,7 +547,7 @@ func (l *List) delete(ctx context.Context, attr string) error { gid = group.BelongsTo(attr) } if dirtyChan != nil { - dirtyChan <- fingerPrint{fp: l.ghash, gid: gid} + dirtyChan <- l.key } return nil } diff --git a/posting/lists.go b/posting/lists.go index 19bdd283fe8..093f879fb99 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -110,7 +110,7 @@ func SyncMarkFor(group uint32) *x.WaterMark { return marks.Get(group) } -func gentleCommit(dirtyMap map[fingerPrint]time.Time, pending chan struct{}, +func gentleCommit(dirtyMap map[string]time.Time, pending chan struct{}, commitFraction float64) { select { case pending <- struct{}{}: @@ -126,7 +126,7 @@ func gentleCommit(dirtyMap map[fingerPrint]time.Time, pending chan struct{}, // Have a min value of n, so we can merge small number of dirty PLs fast. n = 1000 } - keysBuffer := make([]fingerPrint, 0, n) + keysBuffer := make([]string, 0, n) // Convert map to list. var loops int @@ -147,13 +147,13 @@ func gentleCommit(dirtyMap map[fingerPrint]time.Time, pending chan struct{}, } } - go func(keys []fingerPrint) { + go func(keys []string) { defer func() { <-pending }() if len(keys) == 0 { return } for _, key := range keys { - l := lcache.Get(key.fp) + l := lcache.Get(key) if l == nil { continue } @@ -169,7 +169,7 @@ func gentleCommit(dirtyMap map[fingerPrint]time.Time, pending chan struct{}, // merge and evict all posting lists from memory. func periodicCommit() { ticker := time.NewTicker(time.Second) - dirtyMap := make(map[fingerPrint]time.Time, 1000) + dirtyMap := make(map[string]time.Time, 1000) // pending is used to ensure that we only have up to 15 goroutines doing gentle commits. pending := make(chan struct{}, 15) dsize := 0 // needed for better reporting. @@ -177,7 +177,7 @@ func periodicCommit() { for { select { case key := <-dirtyChan: - dirtyMap[key] = time.Now() + dirtyMap[string(key)] = time.Now() case <-ticker.C: if len(dirtyMap) != dsize { @@ -219,7 +219,6 @@ func periodicCommit() { } type fingerPrint struct { - fp string gid uint32 } @@ -230,7 +229,7 @@ const ( var ( pstore *badger.KV syncCh chan syncEntry - dirtyChan chan fingerPrint // All dirty posting list keys are pushed here. + dirtyChan chan []byte // All dirty posting list keys are pushed here. marks *syncMarks lcache *listCache ) @@ -240,7 +239,7 @@ func Init(ps *badger.KV) { marks = new(syncMarks) pstore = ps lcache = newListCache(math.MaxUint64) - dirtyChan = make(chan fingerPrint, 10000) + dirtyChan = make(chan []byte, 10000) syncCh = make(chan syncEntry, syncChCapacity) go periodicCommit() @@ -259,9 +258,7 @@ func Init(ps *badger.KV) { // And watermark stuff would have to be located outside worker pkg, maybe in x. // That way, we don't have a dependency conflict. func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) { - fp := string(key) - - lp := lcache.Get(fp) + lp := lcache.Get(string(key)) if lp != nil { x.CacheHit.Add(1) lp.incr() @@ -274,7 +271,7 @@ func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) { l := getNew(key, pstore) // This retrieves a new *List and sets refcount to 1. l.water = marks.Get(group) - lp = lcache.PutIfMissing(fp, l) + lp = lcache.PutIfMissing(string(key), l) // We are always going to return lp to caller, whether it is l or not. So, let's // increment its reference counter. @@ -298,8 +295,7 @@ func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) { // Get takes a key and a groupID. It checks if the in-memory map has an // updated value and returns it if it exists or it gets from the store and DOES NOT ADD to lhmap. func Get(key []byte, gid uint32) (rlist *List, decr func()) { - fp := string(key) - lp := lcache.Get(fp) + lp := lcache.Get(string(key)) if lp != nil { lp.incr() From c974f95d8edc06cef574e0fbef6e74b3fb1b1998 Mon Sep 17 00:00:00 2001 From: Ashwin Ramesh Date: Thu, 27 Jul 2017 14:25:56 +1000 Subject: [PATCH 4/5] Update --- posting/list.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/posting/list.go b/posting/list.go index e4f31c7c450..b5a1725ea32 100644 --- a/posting/list.go +++ b/posting/list.go @@ -67,7 +67,6 @@ type List struct { x.SafeMutex index x.SafeMutex key []byte - ghash string plist *protos.PostingList mlayer []*protos.Posting // mutations lastCompact time.Time @@ -199,7 +198,6 @@ func getNew(key []byte, pstore *badger.KV) *List { l := listPool.Get().(*List) *l = List{} l.key = key - l.ghash = string(key) l.refcount = 1 l.Lock() From ed9170ba01f4d5455250dd9a7691e5cb8a3474f1 Mon Sep 17 00:00:00 2001 From: Ashwin Ramesh Date: Thu, 27 Jul 2017 14:31:06 +1000 Subject: [PATCH 5/5] Update --- posting/list.go | 2 +- posting/lists.go | 8 +++----- posting/lru.go | 6 +++--- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/posting/list.go b/posting/list.go index f23720b2891..34192f8508a 100644 --- a/posting/list.go +++ b/posting/list.go @@ -757,7 +757,7 @@ func (l *List) SyncIfDirty(delFromCache bool) (committed bool, err error) { } if delFromCache { x.AssertTrue(atomic.LoadInt32(&l.deleteMe) == 1) - lcache.delete(l.ghash) + lcache.delete(l.key) } } diff --git a/posting/lists.go b/posting/lists.go index e7048c182d1..40ecd181ff8 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -28,7 +28,6 @@ import ( "golang.org/x/net/trace" "github.com/dgraph-io/badger" - farm "github.com/dgryski/go-farm" "github.com/dgraph-io/dgraph/protos" "github.com/dgraph-io/dgraph/x" @@ -224,7 +223,7 @@ const ( var ( pstore *badger.KV - dirtyChan chan fingerPrint // All dirty posting list keys are pushed here. + dirtyChan chan []byte // All dirty posting list keys are pushed here. marks *syncMarks lcache *listCache ) @@ -235,7 +234,7 @@ func Init(ps *badger.KV) { pstore = ps lcache = newListCache(math.MaxUint64) x.LcacheCapacity.Set(math.MaxInt64) - dirtyChan = make(chan fingerPrint, 10000) + dirtyChan = make(chan []byte, 10000) go periodicCommit() } @@ -287,8 +286,7 @@ func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) { // Get takes a key and a groupID. It checks if the in-memory map has an // updated value and returns it if it exists or it gets from the store and DOES NOT ADD to lhmap. func Get(key []byte) (rlist *List, decr func()) { - fp := farm.Fingerprint64(key) - lp := lcache.Get(fp) + lp := lcache.Get(string(key)) if lp != nil { return lp, lp.decr diff --git a/posting/lru.go b/posting/lru.go index 292cf54e8c9..acc06a71dda 100644 --- a/posting/lru.go +++ b/posting/lru.go @@ -198,13 +198,13 @@ func (c *listCache) clear(attr string) error { } // delete removes a key from cache -func (c *listCache) delete(key uint64) { +func (c *listCache) delete(key []byte) { c.Lock() defer c.Unlock() - if ele, ok := c.cache[key]; ok { + if ele, ok := c.cache[string(key)]; ok { c.ll.Remove(ele) - delete(c.cache, key) + delete(c.cache, string(key)) kv := ele.Value.(*entry) kv.pl.decr() }