Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch/keys #1181

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ type List struct {
x.SafeMutex
index x.SafeMutex
key []byte
ghash uint64
plist *protos.PostingList
mlayer []*protos.Posting // mutations
lastCompact time.Time
Expand Down Expand Up @@ -199,7 +198,6 @@ func getNew(key []byte, pstore *badger.KV) *List {
l := listPool.Get().(*List)
*l = List{}
l.key = key
l.ghash = farm.Fingerprint64(key)
l.refcount = 1

l.Lock()
Expand Down Expand Up @@ -519,7 +517,7 @@ func (l *List) addMutation(ctx context.Context, t *protos.DirectedEdge) (bool, e
gid = group.BelongsTo(t.Attr)
}
if dirtyChan != nil {
dirtyChan <- fingerPrint{fp: l.ghash, gid: gid}
dirtyChan <- l.key
}
}
return hasMutated, nil
Expand Down Expand Up @@ -550,7 +548,7 @@ func (l *List) delete(ctx context.Context, attr string) error {
gid = group.BelongsTo(attr)
}
if dirtyChan != nil {
dirtyChan <- fingerPrint{fp: l.ghash, gid: gid}
dirtyChan <- l.key
}
return nil
}
Expand Down Expand Up @@ -759,7 +757,7 @@ func (l *List) SyncIfDirty(delFromCache bool) (committed bool, err error) {
}
if delFromCache {
x.AssertTrue(atomic.LoadInt32(&l.deleteMe) == 1)
lcache.delete(l.ghash)
lcache.delete(l.key)
}
}

Expand Down
29 changes: 12 additions & 17 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"golang.org/x/net/trace"

"github.com/dgraph-io/badger"
"github.com/dgryski/go-farm"

"github.com/dgraph-io/dgraph/protos"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -106,7 +105,7 @@ func SyncMarkFor(group uint32) *x.WaterMark {
return marks.Get(group)
}

func gentleCommit(dirtyMap map[fingerPrint]time.Time, pending chan struct{},
func gentleCommit(dirtyMap map[string]time.Time, pending chan struct{},
commitFraction float64) {
select {
case pending <- struct{}{}:
Expand All @@ -122,7 +121,7 @@ func gentleCommit(dirtyMap map[fingerPrint]time.Time, pending chan struct{},
// Have a min value of n, so we can merge small number of dirty PLs fast.
n = 1000
}
keysBuffer := make([]fingerPrint, 0, n)
keysBuffer := make([]string, 0, n)

// Convert map to list.
var loops int
Expand All @@ -143,13 +142,13 @@ func gentleCommit(dirtyMap map[fingerPrint]time.Time, pending chan struct{},
}
}

go func(keys []fingerPrint) {
go func(keys []string) {
defer func() { <-pending }()
if len(keys) == 0 {
return
}
for _, key := range keys {
l := lcache.Get(key.fp)
l := lcache.Get(key)
if l == nil {
continue
}
Expand All @@ -165,15 +164,15 @@ func gentleCommit(dirtyMap map[fingerPrint]time.Time, pending chan struct{},
// merge and evict all posting lists from memory.
func periodicCommit() {
ticker := time.NewTicker(time.Second)
dirtyMap := make(map[fingerPrint]time.Time, 1000)
dirtyMap := make(map[string]time.Time, 1000)
// pending is used to ensure that we only have up to 15 goroutines doing gentle commits.
pending := make(chan struct{}, 15)
dsize := 0 // needed for better reporting.
setLruMemory := true
for {
select {
case key := <-dirtyChan:
dirtyMap[key] = time.Now()
dirtyMap[string(key)] = time.Now()

case <-ticker.C:
if len(dirtyMap) != dsize {
Expand Down Expand Up @@ -215,7 +214,6 @@ func periodicCommit() {
}

type fingerPrint struct {
fp uint64
gid uint32
}

Expand All @@ -225,7 +223,7 @@ const (

var (
pstore *badger.KV
dirtyChan chan fingerPrint // All dirty posting list keys are pushed here.
dirtyChan chan []byte // All dirty posting list keys are pushed here.
marks *syncMarks
lcache *listCache
)
Expand All @@ -236,7 +234,7 @@ func Init(ps *badger.KV) {
pstore = ps
lcache = newListCache(math.MaxUint64)
x.LcacheCapacity.Set(math.MaxInt64)
dirtyChan = make(chan fingerPrint, 10000)
dirtyChan = make(chan []byte, 10000)

go periodicCommit()
}
Expand All @@ -253,9 +251,7 @@ func Init(ps *badger.KV) {
// And watermark stuff would have to be located outside worker pkg, maybe in x.
// That way, we don't have a dependency conflict.
func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) {
fp := farm.Fingerprint64(key)

lp := lcache.Get(fp)
lp := lcache.Get(string(key))
if lp != nil {
x.CacheHit.Add(1)
return lp, lp.decr
Expand All @@ -269,7 +265,7 @@ func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) {

// We are always going to return lp to caller, whether it is l or not
// lcache increments the ref counter
lp = lcache.PutIfMissing(fp, l)
lp = lcache.PutIfMissing(string(key), l)

if lp != l {
x.CacheRace.Add(1)
Expand All @@ -290,8 +286,7 @@ func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) {
// Get takes a key and a groupID. It checks if the in-memory map has an
// updated value and returns it if it exists or it gets from the store and DOES NOT ADD to lhmap.
func Get(key []byte) (rlist *List, decr func()) {
fp := farm.Fingerprint64(key)
lp := lcache.Get(fp)
lp := lcache.Get(string(key))

if lp != nil {
return lp, lp.decr
Expand Down Expand Up @@ -332,7 +327,7 @@ func CommitLists(numRoutines int, group uint32) {
}()
}

lcache.Each(func(k uint64, l *List) {
lcache.Each(func(k string, l *List) {
if l == nil { // To be safe. Check might be unnecessary.
return
}
Expand Down
20 changes: 10 additions & 10 deletions posting/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type listCache struct {
curSize uint64
evicts uint64
ll *list.List
cache map[uint64]*list.Element
cache map[string]*list.Element
}

type CacheStats struct {
Expand All @@ -48,7 +48,7 @@ type CacheStats struct {
}

type entry struct {
key uint64
key string
pl *List
size uint64
}
Expand All @@ -59,7 +59,7 @@ func newListCache(maxSize uint64) *listCache {
ctx: context.Background(),
MaxSize: maxSize,
ll: list.New(),
cache: make(map[uint64]*list.Element),
cache: make(map[string]*list.Element),
}
}

Expand All @@ -78,7 +78,7 @@ func (c *listCache) UpdateMaxSize() {

// TODO: fingerprint can collide
// Add adds a value to the cache.
func (c *listCache) PutIfMissing(key uint64, pl *List) (res *List) {
func (c *listCache) PutIfMissing(key string, pl *List) (res *List) {
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -130,7 +130,7 @@ func (c *listCache) removeOldest() {
}

// Get looks up a key's value from the cache.
func (c *listCache) Get(key uint64) (pl *List) {
func (c *listCache) Get(key string) (pl *List) {
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -158,7 +158,7 @@ func (c *listCache) Stats() CacheStats {
}
}

func (c *listCache) Each(f func(key uint64, val *List)) {
func (c *listCache) Each(f func(key string, val *List)) {
c.Lock()
defer c.Unlock()

Expand All @@ -174,7 +174,7 @@ func (c *listCache) Reset() {
c.Lock()
defer c.Unlock()
c.ll = list.New()
c.cache = make(map[uint64]*list.Element)
c.cache = make(map[string]*list.Element)
c.curSize = 0
}

Expand All @@ -199,13 +199,13 @@ func (c *listCache) clear(attr string, typ byte) error {
}

// delete removes a key from cache
func (c *listCache) delete(key uint64) {
func (c *listCache) delete(key []byte) {
c.Lock()
defer c.Unlock()

if ele, ok := c.cache[key]; ok {
if ele, ok := c.cache[string(key)]; ok {
c.ll.Remove(ele)
delete(c.cache, key)
delete(c.cache, string(key))
kv := ele.Value.(*entry)
kv.pl.decr()
}
Expand Down
17 changes: 9 additions & 8 deletions posting/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package posting

import (
"fmt"
"sync"
"testing"

Expand All @@ -39,7 +40,7 @@ func TestLCacheSize(t *testing.T) {
for i := 0; i < 10; i++ {
// Put a posting list of size 2
l := getPosting()
lcache.PutIfMissing(uint64(i), l)
lcache.PutIfMissing(fmt.Sprintf("%d", i), l)
if i < 5 {
require.Equal(t, lcache.curSize, uint64((i+1)*100))
} else {
Expand All @@ -60,7 +61,7 @@ func TestLCacheSizeParallel(t *testing.T) {
// Put a posting list of size 2
go func(i int) {
l := getPosting()
lcache.PutIfMissing(uint64(i), l)
lcache.PutIfMissing(fmt.Sprintf("%d", i), l)
wg.Done()
}(i)
}
Expand All @@ -77,23 +78,23 @@ func TestLCacheEviction(t *testing.T) {
for i := 0; i < 100; i++ {
l := getPosting()
// Put a posting list of size 2
lcache.PutIfMissing(uint64(i), l)
lcache.PutIfMissing(fmt.Sprintf("%d", i), l)
}

require.Equal(t, lcache.curSize, uint64(5000))
require.Equal(t, lcache.evicts, uint64(50))
require.Equal(t, lcache.ll.Len(), 50)

for i := 0; i < 50; i++ {
require.Nil(t, lcache.Get(uint64(i)))
require.Nil(t, lcache.Get(fmt.Sprintf("%d", i)))
}
}

func TestLCachePutIfMissing(t *testing.T) {
l := getPosting()
lcache.PutIfMissing(1, l)
require.Equal(t, l, lcache.Get(1))
lcache.PutIfMissing("1", l)
require.Equal(t, l, lcache.Get("1"))
l2 := getPosting()
lcache.PutIfMissing(1, l2)
require.Equal(t, l, lcache.Get(1))
lcache.PutIfMissing("1", l2)
require.Equal(t, l, lcache.Get("1"))
}
6 changes: 6 additions & 0 deletions tok/tok.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tok

import (
"encoding/binary"
"fmt"
"time"

farm "github.com/dgryski/go-farm"
Expand Down Expand Up @@ -241,6 +242,11 @@ func (t ExactTokenizer) Tokens(sv types.Val) ([]string, error) {
if !ok {
return nil, x.Errorf("Exact indices only supported for string types")
}
if len(term) > 100 {
fmt.Printf("*****\n")
fmt.Printf("Long text for exact index. Consider switching to hash for better performance\n")
fmt.Printf("*****\n")
}
return []string{encodeToken(term, t.Identifier())}, nil
}
func (t ExactTokenizer) Identifier() byte { return 0x2 }
Expand Down