diff --git a/posting/list.go b/posting/list.go index 14a186ce9ae..34192f8508a 100644 --- a/posting/list.go +++ b/posting/list.go @@ -67,7 +67,6 @@ type List struct { x.SafeMutex index x.SafeMutex key []byte - ghash uint64 plist *protos.PostingList mlayer []*protos.Posting // mutations lastCompact time.Time @@ -199,7 +198,6 @@ func getNew(key []byte, pstore *badger.KV) *List { l := listPool.Get().(*List) *l = List{} l.key = key - l.ghash = farm.Fingerprint64(key) l.refcount = 1 l.Lock() @@ -519,7 +517,7 @@ func (l *List) addMutation(ctx context.Context, t *protos.DirectedEdge) (bool, e gid = group.BelongsTo(t.Attr) } if dirtyChan != nil { - dirtyChan <- fingerPrint{fp: l.ghash, gid: gid} + dirtyChan <- l.key } } return hasMutated, nil @@ -550,7 +548,7 @@ func (l *List) delete(ctx context.Context, attr string) error { gid = group.BelongsTo(attr) } if dirtyChan != nil { - dirtyChan <- fingerPrint{fp: l.ghash, gid: gid} + dirtyChan <- l.key } return nil } @@ -759,7 +757,7 @@ func (l *List) SyncIfDirty(delFromCache bool) (committed bool, err error) { } if delFromCache { x.AssertTrue(atomic.LoadInt32(&l.deleteMe) == 1) - lcache.delete(l.ghash) + lcache.delete(l.key) } } diff --git a/posting/lists.go b/posting/lists.go index 22b8e8840f4..40ecd181ff8 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -28,7 +28,6 @@ import ( "golang.org/x/net/trace" "github.com/dgraph-io/badger" - "github.com/dgryski/go-farm" "github.com/dgraph-io/dgraph/protos" "github.com/dgraph-io/dgraph/x" @@ -106,7 +105,7 @@ func SyncMarkFor(group uint32) *x.WaterMark { return marks.Get(group) } -func gentleCommit(dirtyMap map[fingerPrint]time.Time, pending chan struct{}, +func gentleCommit(dirtyMap map[string]time.Time, pending chan struct{}, commitFraction float64) { select { case pending <- struct{}{}: @@ -122,7 +121,7 @@ func gentleCommit(dirtyMap map[fingerPrint]time.Time, pending chan struct{}, // Have a min value of n, so we can merge small number of dirty PLs fast. n = 1000 } - keysBuffer := make([]fingerPrint, 0, n) + keysBuffer := make([]string, 0, n) // Convert map to list. var loops int @@ -143,13 +142,13 @@ func gentleCommit(dirtyMap map[fingerPrint]time.Time, pending chan struct{}, } } - go func(keys []fingerPrint) { + go func(keys []string) { defer func() { <-pending }() if len(keys) == 0 { return } for _, key := range keys { - l := lcache.Get(key.fp) + l := lcache.Get(key) if l == nil { continue } @@ -165,7 +164,7 @@ func gentleCommit(dirtyMap map[fingerPrint]time.Time, pending chan struct{}, // merge and evict all posting lists from memory. func periodicCommit() { ticker := time.NewTicker(time.Second) - dirtyMap := make(map[fingerPrint]time.Time, 1000) + dirtyMap := make(map[string]time.Time, 1000) // pending is used to ensure that we only have up to 15 goroutines doing gentle commits. pending := make(chan struct{}, 15) dsize := 0 // needed for better reporting. @@ -173,7 +172,7 @@ func periodicCommit() { for { select { case key := <-dirtyChan: - dirtyMap[key] = time.Now() + dirtyMap[string(key)] = time.Now() case <-ticker.C: if len(dirtyMap) != dsize { @@ -215,7 +214,6 @@ func periodicCommit() { } type fingerPrint struct { - fp uint64 gid uint32 } @@ -225,7 +223,7 @@ const ( var ( pstore *badger.KV - dirtyChan chan fingerPrint // All dirty posting list keys are pushed here. + dirtyChan chan []byte // All dirty posting list keys are pushed here. marks *syncMarks lcache *listCache ) @@ -236,7 +234,7 @@ func Init(ps *badger.KV) { pstore = ps lcache = newListCache(math.MaxUint64) x.LcacheCapacity.Set(math.MaxInt64) - dirtyChan = make(chan fingerPrint, 10000) + dirtyChan = make(chan []byte, 10000) go periodicCommit() } @@ -253,9 +251,7 @@ func Init(ps *badger.KV) { // And watermark stuff would have to be located outside worker pkg, maybe in x. // That way, we don't have a dependency conflict. func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) { - fp := farm.Fingerprint64(key) - - lp := lcache.Get(fp) + lp := lcache.Get(string(key)) if lp != nil { x.CacheHit.Add(1) return lp, lp.decr @@ -269,7 +265,7 @@ func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) { // We are always going to return lp to caller, whether it is l or not // lcache increments the ref counter - lp = lcache.PutIfMissing(fp, l) + lp = lcache.PutIfMissing(string(key), l) if lp != l { x.CacheRace.Add(1) @@ -290,8 +286,7 @@ func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) { // Get takes a key and a groupID. It checks if the in-memory map has an // updated value and returns it if it exists or it gets from the store and DOES NOT ADD to lhmap. func Get(key []byte) (rlist *List, decr func()) { - fp := farm.Fingerprint64(key) - lp := lcache.Get(fp) + lp := lcache.Get(string(key)) if lp != nil { return lp, lp.decr @@ -332,7 +327,7 @@ func CommitLists(numRoutines int, group uint32) { }() } - lcache.Each(func(k uint64, l *List) { + lcache.Each(func(k string, l *List) { if l == nil { // To be safe. Check might be unnecessary. return } diff --git a/posting/lru.go b/posting/lru.go index 0882cc19e2d..9b188bd5cc9 100644 --- a/posting/lru.go +++ b/posting/lru.go @@ -38,7 +38,7 @@ type listCache struct { curSize uint64 evicts uint64 ll *list.List - cache map[uint64]*list.Element + cache map[string]*list.Element } type CacheStats struct { @@ -48,7 +48,7 @@ type CacheStats struct { } type entry struct { - key uint64 + key string pl *List size uint64 } @@ -59,7 +59,7 @@ func newListCache(maxSize uint64) *listCache { ctx: context.Background(), MaxSize: maxSize, ll: list.New(), - cache: make(map[uint64]*list.Element), + cache: make(map[string]*list.Element), } } @@ -78,7 +78,7 @@ func (c *listCache) UpdateMaxSize() { // TODO: fingerprint can collide // Add adds a value to the cache. -func (c *listCache) PutIfMissing(key uint64, pl *List) (res *List) { +func (c *listCache) PutIfMissing(key string, pl *List) (res *List) { c.Lock() defer c.Unlock() @@ -130,7 +130,7 @@ func (c *listCache) removeOldest() { } // Get looks up a key's value from the cache. -func (c *listCache) Get(key uint64) (pl *List) { +func (c *listCache) Get(key string) (pl *List) { c.Lock() defer c.Unlock() @@ -158,7 +158,7 @@ func (c *listCache) Stats() CacheStats { } } -func (c *listCache) Each(f func(key uint64, val *List)) { +func (c *listCache) Each(f func(key string, val *List)) { c.Lock() defer c.Unlock() @@ -174,7 +174,7 @@ func (c *listCache) Reset() { c.Lock() defer c.Unlock() c.ll = list.New() - c.cache = make(map[uint64]*list.Element) + c.cache = make(map[string]*list.Element) c.curSize = 0 } @@ -199,13 +199,13 @@ func (c *listCache) clear(attr string, typ byte) error { } // delete removes a key from cache -func (c *listCache) delete(key uint64) { +func (c *listCache) delete(key []byte) { c.Lock() defer c.Unlock() - if ele, ok := c.cache[key]; ok { + if ele, ok := c.cache[string(key)]; ok { c.ll.Remove(ele) - delete(c.cache, key) + delete(c.cache, string(key)) kv := ele.Value.(*entry) kv.pl.decr() } diff --git a/posting/lru_test.go b/posting/lru_test.go index bbf2630accd..9e1bdb5ab45 100644 --- a/posting/lru_test.go +++ b/posting/lru_test.go @@ -17,6 +17,7 @@ limitations under the License. package posting import ( + "fmt" "sync" "testing" @@ -39,7 +40,7 @@ func TestLCacheSize(t *testing.T) { for i := 0; i < 10; i++ { // Put a posting list of size 2 l := getPosting() - lcache.PutIfMissing(uint64(i), l) + lcache.PutIfMissing(fmt.Sprintf("%d", i), l) if i < 5 { require.Equal(t, lcache.curSize, uint64((i+1)*100)) } else { @@ -60,7 +61,7 @@ func TestLCacheSizeParallel(t *testing.T) { // Put a posting list of size 2 go func(i int) { l := getPosting() - lcache.PutIfMissing(uint64(i), l) + lcache.PutIfMissing(fmt.Sprintf("%d", i), l) wg.Done() }(i) } @@ -77,7 +78,7 @@ func TestLCacheEviction(t *testing.T) { for i := 0; i < 100; i++ { l := getPosting() // Put a posting list of size 2 - lcache.PutIfMissing(uint64(i), l) + lcache.PutIfMissing(fmt.Sprintf("%d", i), l) } require.Equal(t, lcache.curSize, uint64(5000)) @@ -85,15 +86,15 @@ func TestLCacheEviction(t *testing.T) { require.Equal(t, lcache.ll.Len(), 50) for i := 0; i < 50; i++ { - require.Nil(t, lcache.Get(uint64(i))) + require.Nil(t, lcache.Get(fmt.Sprintf("%d", i))) } } func TestLCachePutIfMissing(t *testing.T) { l := getPosting() - lcache.PutIfMissing(1, l) - require.Equal(t, l, lcache.Get(1)) + lcache.PutIfMissing("1", l) + require.Equal(t, l, lcache.Get("1")) l2 := getPosting() - lcache.PutIfMissing(1, l2) - require.Equal(t, l, lcache.Get(1)) + lcache.PutIfMissing("1", l2) + require.Equal(t, l, lcache.Get("1")) } diff --git a/tok/tok.go b/tok/tok.go index 3d01acc828b..de0856aa97a 100644 --- a/tok/tok.go +++ b/tok/tok.go @@ -19,6 +19,7 @@ package tok import ( "encoding/binary" + "fmt" "time" farm "github.com/dgryski/go-farm" @@ -241,6 +242,11 @@ func (t ExactTokenizer) Tokens(sv types.Val) ([]string, error) { if !ok { return nil, x.Errorf("Exact indices only supported for string types") } + if len(term) > 100 { + fmt.Printf("*****\n") + fmt.Printf("Long text for exact index. Consider switching to hash for better performance\n") + fmt.Printf("*****\n") + } return []string{encodeToken(term, t.Identifier())}, nil } func (t ExactTokenizer) Identifier() byte { return 0x2 }