From a3ecbb0427e313fff56b74b89e77ff5f37ff7726 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 8 Feb 2019 16:05:44 -0800 Subject: [PATCH 01/11] Work on optimizing XidToUid map. --- dgraph/cmd/bulk/loader.go | 7 +- dgraph/cmd/bulk/mapper.go | 4 +- dgraph/cmd/live/run.go | 13 +-- x/x.go | 2 +- xidmap/xidmap.go | 206 +++++++++++++------------------------- 5 files changed, 75 insertions(+), 157 deletions(-) diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index a9bce8dcf6b..711748d08d1 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -157,10 +157,7 @@ func (ld *loader) mapStage() { var err error ld.xidDB, err = badger.Open(opt) x.Check(err) - ld.xids = xidmap.New(ld.xidDB, ld.zero, xidmap.Options{ - NumShards: 1 << 10, - LRUSize: 1 << 19, - }) + ld.xids = xidmap.New(ld.xidDB, ld.zero) var dir, ext string var loaderType int @@ -226,7 +223,7 @@ func (ld *loader) mapStage() { for i := range ld.mappers { ld.mappers[i] = nil } - ld.xids.EvictAll() + // ld.xids.EvictAll() x.Check(ld.xidDB.Close()) ld.xids = nil runtime.GC() diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index ba4b4e53a38..01844388b2f 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -217,8 +217,8 @@ func (m *mapper) processNQuad(nq gql.NQuad) { } func (m *mapper) lookupUid(xid string) uint64 { - uid, isNew := m.xids.AssignUid(xid) - if !isNew || !m.opt.StoreXids { + uid := m.xids.AssignUid(xid) + if !m.opt.StoreXids { return uid } if strings.HasPrefix(xid, "_:") { diff --git a/dgraph/cmd/live/run.go b/dgraph/cmd/live/run.go index 5882e8f298e..5e9965aabe5 100644 --- a/dgraph/cmd/live/run.go +++ b/dgraph/cmd/live/run.go @@ -158,13 +158,11 @@ func (l *loader) uid(val string) string { // to be an existing node in the graph. There is limited protection against // a user selecting an unassigned UID in this way - it may be assigned // later to another node. It is up to the user to avoid this. - if strings.HasPrefix(val, "0x") { - if _, err := strconv.ParseUint(val[2:], 16, 64); err == nil { - return val - } + if uid, err := strconv.ParseUint(val, 0, 64); err == nil { + return fmt.Sprintf("%#x", uid) } - uid, _ := l.alloc.AssignUid(val) + uid := l.alloc.AssignUid(val) return fmt.Sprintf("%#x", uint64(uid)) } @@ -258,10 +256,6 @@ func setup(opts batchMutationOptions, dc *dgo.Dgraph) *loader { alloc := xidmap.New( kv, connzero, - xidmap.Options{ - NumShards: 100, - LRUSize: 1e5, - }, ) l := &loader{ @@ -332,7 +326,6 @@ func run() error { l := setup(bmOpts, dgraphClient) defer l.zeroconn.Close() defer l.kv.Close() - defer l.alloc.EvictAll() if len(opt.schemaFile) > 0 { if err := processSchemaFile(ctx, opt.schemaFile, dgraphClient); err != nil { diff --git a/x/x.go b/x/x.go index 945e0bd473e..a0f1d48d517 100644 --- a/x/x.go +++ b/x/x.go @@ -433,7 +433,7 @@ func SetupConnection(host string, tlsConf *TLSHelperConfig, useGz bool) (*grpc.C grpc.WithBlock(), grpc.WithTimeout(10*time.Second)) - if tlsConf.CertRequired { + if tlsConf != nil && tlsConf.CertRequired { tlsConf.ConfigType = TLSClientConfig tlsCfg, _, err := GenerateTLSConfig(*tlsConf) if err != nil { diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index 395bbe8f4a5..cf20d7b1930 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -17,7 +17,6 @@ package xidmap import ( - "container/list" "context" "encoding/binary" "sync" @@ -28,56 +27,32 @@ import ( "github.com/dgraph-io/badger" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" - farm "github.com/dgryski/go-farm" "github.com/golang/glog" ) -// Options controls the performance characteristics of the XidMap. -type Options struct { - // NumShards controls the number of shards the XidMap is broken into. More - // shards reduces lock contention. - NumShards int - // LRUSize controls the total size of the LRU cache. The LRU is split - // between all shards, so with 4 shards and an LRUSize of 100, each shard - // receives 25 LRU slots. - LRUSize int -} - // XidMap allocates and tracks mappings between Xids and Uids in a threadsafe // manner. It's memory friendly because the mapping is stored on disk, but fast // because it uses an LRU cache. type XidMap struct { - shards []shard - kv *badger.DB - opt Options - newRanges chan *pb.AssignedIds - - noMapMu sync.Mutex - noMap block // block for allocating uids without an xid to uid mapping -} - -type shard struct { sync.Mutex - block - - elems map[string]*list.Element - queue *list.List - beingEvicted map[string]uint64 - - xm *XidMap -} + kv *badger.DB + // opt Options + newRanges chan *pb.AssignedIds + uidMap sync.Map + writer *badger.WriteBatch -type mapping struct { - xid string - uid uint64 - persisted bool + block *block } type block struct { + sync.Mutex start, end uint64 } func (b *block) assign(ch <-chan *pb.AssignedIds) uint64 { + b.Lock() + defer b.Unlock() + if b.end == 0 || b.start > b.end { newRange := <-ch b.start, b.end = newRange.StartId, newRange.EndId @@ -89,19 +64,14 @@ func (b *block) assign(ch <-chan *pb.AssignedIds) uint64 { } // New creates an XidMap with given badger and uid provider. -func New(kv *badger.DB, zero *grpc.ClientConn, opt Options) *XidMap { - x.AssertTrue(opt.LRUSize != 0) - x.AssertTrue(opt.NumShards != 0) +func New(kv *badger.DB, zero *grpc.ClientConn) *XidMap { + // x.AssertTrue(opt.LRUSize != 0) + // x.AssertTrue(opt.NumShards != 0) xm := &XidMap{ - shards: make([]shard, opt.NumShards), kv: kv, - opt: opt, - newRanges: make(chan *pb.AssignedIds), - } - for i := range xm.shards { - xm.shards[i].elems = make(map[string]*list.Element) - xm.shards[i].queue = list.New() - xm.shards[i].xm = xm + newRanges: make(chan *pb.AssignedIds, 10), + writer: kv.NewWriteBatch(), + block: new(block), } go func() { zc := pb.NewZeroClient(zero) @@ -110,7 +80,8 @@ func New(kv *badger.DB, zero *grpc.ClientConn, opt Options) *XidMap { backoff := initBackoff for { ctx, cancel := context.WithTimeout(context.Background(), time.Second) - assigned, err := zc.AssignUids(ctx, &pb.Num{Val: 10000}) + assigned, err := zc.AssignUids(ctx, &pb.Num{Val: 1e5}) + glog.V(1).Infof("Assigned Uids: %+v. Err: %v", assigned, err) cancel() if err == nil { backoff = initBackoff @@ -130,104 +101,61 @@ func New(kv *badger.DB, zero *grpc.ClientConn, opt Options) *XidMap { } // AssignUid creates new or looks up existing XID to UID mappings. -func (m *XidMap) AssignUid(xid string) (uid uint64, isNew bool) { - fp := farm.Fingerprint64([]byte(xid)) - idx := fp % uint64(m.opt.NumShards) - sh := &m.shards[idx] - - sh.Lock() - defer sh.Unlock() - - var ok bool - uid, ok = sh.lookup(xid) +func (m *XidMap) AssignUid(xid string) (uid uint64) { + val, ok := m.uidMap.Load(xid) if ok { - return uid, false + return val.(uint64) } - - x.Check(m.kv.View(func(txn *badger.Txn) error { - item, err := txn.Get([]byte(xid)) - if err == badger.ErrKeyNotFound { - return nil + // TODO: Load up the xid->uid map upfront. + + // fp := farm.Fingerprint64([]byte(xid)) + // idx := fp % uint64(m.opt.NumShards) + // sh := &m.shards[idx] + + // sh.Lock() + // defer sh.Unlock() + + // var ok bool + // uid, ok = sh.lookup(xid) + // if ok { + // return uid, false + // } + + // x.Check(m.kv.View(func(txn *badger.Txn) error { + // item, err := txn.Get([]byte(xid)) + // if err == badger.ErrKeyNotFound { + // return nil + // } + // x.Check(err) + // return item.Value(func(uidBuf []byte) error { + // x.AssertTrue(len(uidBuf) > 0) + // var n int + // uid, n = binary.Uvarint(uidBuf) + // x.AssertTrue(n == len(uidBuf)) + // ok = true + // return nil + // }) + // })) + // if ok { + // sh.add(xid, uid, true) + // return uid, false + // } + + uid = m.block.assign(m.newRanges) + val, loaded := m.uidMap.LoadOrStore(xid, uid) + uid = val.(uint64) + if !loaded { + // This uid was stored. + var uidBuf [binary.MaxVarintLen64]byte + n := binary.PutUvarint(uidBuf[:], uid) + if err := m.writer.Set([]byte(xid), uidBuf[:n], 0); err != nil { + panic(err) } - x.Check(err) - return item.Value(func(uidBuf []byte) error { - x.AssertTrue(len(uidBuf) > 0) - var n int - uid, n = binary.Uvarint(uidBuf) - x.AssertTrue(n == len(uidBuf)) - ok = true - return nil - }) - })) - if ok { - sh.add(xid, uid, true) - return uid, false } - - uid = sh.assign(m.newRanges) - sh.add(xid, uid, false) - return uid, true + return uid } // AllocateUid gives a single uid without creating an xid to uid mapping. func (m *XidMap) AllocateUid() uint64 { - m.noMapMu.Lock() - defer m.noMapMu.Unlock() - return m.noMap.assign(m.newRanges) -} - -func (s *shard) lookup(xid string) (uint64, bool) { - elem, ok := s.elems[xid] - if ok { - s.queue.MoveToBack(elem) - return elem.Value.(*mapping).uid, true - } - if uid, ok := s.beingEvicted[xid]; ok { - s.add(xid, uid, true) - return uid, true - } - return 0, false -} - -func (s *shard) add(xid string, uid uint64, persisted bool) { - lruSizePerShard := s.xm.opt.LRUSize / s.xm.opt.NumShards - if s.queue.Len() >= lruSizePerShard && len(s.beingEvicted) == 0 { - s.evict(0.5) - } - - m := &mapping{ - xid: xid, - uid: uid, - persisted: persisted, - } - elem := s.queue.PushBack(m) - s.elems[xid] = elem -} - -func (m *XidMap) EvictAll() { - for i := range make([]struct{}, len(m.shards)) { - m.shards[i].Lock() - m.shards[i].evict(1.0) - m.shards[i].Unlock() - } -} - -func (s *shard) evict(ratio float64) { - evict := int(float64(s.queue.Len()) * ratio) - s.beingEvicted = make(map[string]uint64) - txn := s.xm.kv.NewTransaction(true) - defer txn.Discard() - for i := 0; i < evict; i++ { - m := s.queue.Remove(s.queue.Front()).(*mapping) - delete(s.elems, m.xid) - s.beingEvicted[m.xid] = m.uid - if !m.persisted { - var uidBuf [binary.MaxVarintLen64]byte - n := binary.PutUvarint(uidBuf[:], m.uid) - txn.Set([]byte(m.xid), uidBuf[:n]) - } - - } - x.Check(txn.Commit()) - s.beingEvicted = nil + return m.block.assign(m.newRanges) } From febc181b5d46bb63bc95492c6fd7a932a29c5a52 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 8 Feb 2019 16:06:19 -0800 Subject: [PATCH 02/11] Add the test and benchmark for xid to uid map --- xidmap/xidmap_test.go | 75 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 xidmap/xidmap_test.go diff --git a/xidmap/xidmap_test.go b/xidmap/xidmap_test.go new file mode 100644 index 00000000000..b9c8b3efb4b --- /dev/null +++ b/xidmap/xidmap_test.go @@ -0,0 +1,75 @@ +package xidmap + +import ( + "fmt" + "io/ioutil" + "os" + "sync/atomic" + "testing" + + "github.com/dgraph-io/badger" + "github.com/dgraph-io/dgraph/x" + "github.com/stretchr/testify/require" +) + +// Opens a badger db and runs a a test on it. +func runTest(t *testing.T, test func(t *testing.T, xidmap *XidMap)) { + dir, err := ioutil.TempDir(".", "badger-test") + require.NoError(t, err) + defer os.RemoveAll(dir) + + opt := badger.LSMOnlyOptions + opt.Dir = dir + opt.ValueDir = dir + + db, err := badger.Open(opt) + require.NoError(t, err) + defer db.Close() + + conn, err := x.SetupConnection("localhost:5080", nil, false) + require.NoError(t, err) + require.NotNil(t, conn) + + xidmap := New(db, conn) + test(t, xidmap) +} + +func TestXidmap(t *testing.T) { + runTest(t, func(t *testing.T, xidmap *XidMap) { + uida := xidmap.AssignUid("a") + require.Equal(t, uida, xidmap.AssignUid("a")) + + uidb := xidmap.AssignUid("b") + require.True(t, uida != uidb) + require.Equal(t, uidb, xidmap.AssignUid("b")) + }) +} + +func BenchmarkXidmap(b *testing.B) { + dir, err := ioutil.TempDir(".", "badger-test") + x.Check(err) + defer os.RemoveAll(dir) + + opt := badger.LSMOnlyOptions + opt.Dir = dir + opt.ValueDir = dir + + db, err := badger.Open(opt) + x.Check(err) + defer db.Close() + + conn, err := x.SetupConnection("localhost:5080", nil, false) + x.Check(err) + x.AssertTrue(conn != nil) + + var counter uint64 + xidmap := New(db, conn) + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + xid := atomic.AddUint64(&counter, 1) + xidmap.AssignUid(fmt.Sprintf("xid-%d", xid)) + } + }) +} From d1756571032919d9655e7d35f22f42c622e6e675 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sat, 9 Feb 2019 18:49:24 -0800 Subject: [PATCH 03/11] Working code with decreased memory usage. Includes a new BumpUp API. --- xidmap/xidmap.go | 169 +++++++++++++++++++++++++----------------- xidmap/xidmap_test.go | 88 +++++++++++++++++----- 2 files changed, 173 insertions(+), 84 deletions(-) diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index cf20d7b1930..e202d93a99b 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -34,25 +34,21 @@ import ( // manner. It's memory friendly because the mapping is stored on disk, but fast // because it uses an LRU cache. type XidMap struct { - sync.Mutex - kv *badger.DB - // opt Options - newRanges chan *pb.AssignedIds - uidMap sync.Map - writer *badger.WriteBatch - - block *block + sync.RWMutex // protects uidMap and block. + newRanges chan *pb.AssignedIds + uidMap map[string]uint64 + block *block + + // Optionally, these can be set to persist the mappings. + writer *badger.WriteBatch } type block struct { - sync.Mutex start, end uint64 } +// This must already have a write lock. func (b *block) assign(ch <-chan *pb.AssignedIds) uint64 { - b.Lock() - defer b.Unlock() - if b.end == 0 || b.start > b.end { newRange := <-ch b.start, b.end = newRange.StartId, newRange.EndId @@ -63,16 +59,44 @@ func (b *block) assign(ch <-chan *pb.AssignedIds) uint64 { return uid } -// New creates an XidMap with given badger and uid provider. -func New(kv *badger.DB, zero *grpc.ClientConn) *XidMap { - // x.AssertTrue(opt.LRUSize != 0) - // x.AssertTrue(opt.NumShards != 0) +// New creates an XidMap. zero conn must be valid for UID allocations to happen. Optionally, a +// badger.DB can be provided to persist the xid to uid allocations. This would add latency to the +// assignment operations. +func New(zero *grpc.ClientConn, db *badger.DB) *XidMap { xm := &XidMap{ - kv: kv, newRanges: make(chan *pb.AssignedIds, 10), - writer: kv.NewWriteBatch(), block: new(block), + uidMap: make(map[string]uint64), + } + if db != nil { + // If DB is provided, let's load up all the xid -> uid mappings in memory. + xm.writer = db.NewWriteBatch() + + err := db.View(func(txn *badger.Txn) error { + var count int + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = false + itr := txn.NewIterator(opt) + defer itr.Close() + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + key := string(item.Key()) + err := item.Value(func(val []byte) error { + uid := binary.BigEndian.Uint64(val) + xm.uidMap[key] = uid + return nil + }) + if err != nil { + return err + } + count++ + } + glog.Infof("Loaded up %d xid to uid mappings", count) + return nil + }) + x.Check(err) } + go func() { zc := pb.NewZeroClient(zero) const initBackoff = 10 * time.Millisecond @@ -95,67 +119,78 @@ func New(kv *badger.DB, zero *grpc.ClientConn) *XidMap { } time.Sleep(backoff) } - }() return xm } // AssignUid creates new or looks up existing XID to UID mappings. -func (m *XidMap) AssignUid(xid string) (uid uint64) { - val, ok := m.uidMap.Load(xid) - if ok { - return val.(uint64) +func (m *XidMap) AssignUid(xid string) uint64 { + m.RLock() + uid := m.uidMap[xid] + m.RUnlock() + if uid > 0 { + return uid } - // TODO: Load up the xid->uid map upfront. - - // fp := farm.Fingerprint64([]byte(xid)) - // idx := fp % uint64(m.opt.NumShards) - // sh := &m.shards[idx] - - // sh.Lock() - // defer sh.Unlock() - - // var ok bool - // uid, ok = sh.lookup(xid) - // if ok { - // return uid, false - // } - - // x.Check(m.kv.View(func(txn *badger.Txn) error { - // item, err := txn.Get([]byte(xid)) - // if err == badger.ErrKeyNotFound { - // return nil - // } - // x.Check(err) - // return item.Value(func(uidBuf []byte) error { - // x.AssertTrue(len(uidBuf) > 0) - // var n int - // uid, n = binary.Uvarint(uidBuf) - // x.AssertTrue(n == len(uidBuf)) - // ok = true - // return nil - // }) - // })) - // if ok { - // sh.add(xid, uid, true) - // return uid, false - // } - - uid = m.block.assign(m.newRanges) - val, loaded := m.uidMap.LoadOrStore(xid, uid) - uid = val.(uint64) - if !loaded { - // This uid was stored. - var uidBuf [binary.MaxVarintLen64]byte - n := binary.PutUvarint(uidBuf[:], uid) - if err := m.writer.Set([]byte(xid), uidBuf[:n], 0); err != nil { + + m.Lock() + defer m.Unlock() + + uid = m.uidMap[xid] + if uid > 0 { + return uid + } + + newUid := m.block.assign(m.newRanges) + m.uidMap[xid] = newUid + + if m.writer != nil { + var uidBuf [8]byte + binary.BigEndian.PutUint64(uidBuf[:], newUid) + if err := m.writer.Set([]byte(xid), uidBuf[:], 0); err != nil { panic(err) } } - return uid + return newUid +} + +// BumpUp can be used to make Zero allocate UIDs up to this given number. +func (m *XidMap) BumpUp(uid uint64) { + m.RLock() + end := m.block.end + m.RUnlock() + if uid <= end { + return + } + + m.Lock() + defer m.Unlock() + + b := m.block + for { + if uid < b.start { + return + } + if uid == b.start { + b.start++ + return + } + if uid < b.end { + b.start = uid + 1 + return + } + newRange := <-m.newRanges + b.start, b.end = newRange.StartId, newRange.EndId + } } // AllocateUid gives a single uid without creating an xid to uid mapping. func (m *XidMap) AllocateUid() uint64 { + m.Lock() + defer m.Unlock() return m.block.assign(m.newRanges) } + +// Flush must be called if DB is provided to XidMap. +func (m *XidMap) Flush() error { + return m.writer.Flush() +} diff --git a/xidmap/xidmap_test.go b/xidmap/xidmap_test.go index b9c8b3efb4b..1e139f1e79e 100644 --- a/xidmap/xidmap_test.go +++ b/xidmap/xidmap_test.go @@ -4,8 +4,11 @@ import ( "fmt" "io/ioutil" "os" + "runtime" + "sync" "sync/atomic" "testing" + "time" "github.com/dgraph-io/badger" "github.com/dgraph-io/dgraph/x" @@ -13,7 +16,7 @@ import ( ) // Opens a badger db and runs a a test on it. -func runTest(t *testing.T, test func(t *testing.T, xidmap *XidMap)) { +func withDB(t *testing.T, test func(db *badger.DB)) { dir, err := ioutil.TempDir(".", "badger-test") require.NoError(t, err) defer os.RemoveAll(dir) @@ -26,44 +29,95 @@ func runTest(t *testing.T, test func(t *testing.T, xidmap *XidMap)) { require.NoError(t, err) defer db.Close() + test(db) +} + +func TestXidmap(t *testing.T) { conn, err := x.SetupConnection("localhost:5080", nil, false) require.NoError(t, err) require.NotNil(t, conn) - xidmap := New(db, conn) - test(t, xidmap) -} + withDB(t, func(db *badger.DB) { + xidmap := New(conn, db) -func TestXidmap(t *testing.T) { - runTest(t, func(t *testing.T, xidmap *XidMap) { uida := xidmap.AssignUid("a") require.Equal(t, uida, xidmap.AssignUid("a")) uidb := xidmap.AssignUid("b") require.True(t, uida != uidb) require.Equal(t, uidb, xidmap.AssignUid("b")) + + to := xidmap.block.end + 1e6 + 3 + xidmap.BumpUp(to) + uid := xidmap.AllocateUid() + t.Logf("bump up to: %d. allocated: %d", to, uid) + require.True(t, uid > to, "to: %d. Got: %d", to, uid) + + require.NoError(t, xidmap.Flush()) + xidmap = nil + + xidmap2 := New(conn, db) + require.Equal(t, uida, xidmap2.AssignUid("a")) + require.Equal(t, uidb, xidmap2.AssignUid("b")) }) } -func BenchmarkXidmap(b *testing.B) { - dir, err := ioutil.TempDir(".", "badger-test") - x.Check(err) - defer os.RemoveAll(dir) +func TestXidmapMemory(t *testing.T) { + var loop uint32 + bToMb := func(b uint64) uint64 { + return b / 1024 / 1024 + } + printMemory := func() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + // For info on each, see: https://golang.org/pkg/runtime/#MemStats + fmt.Printf(" Heap = %v M", bToMb(m.HeapInuse)) + fmt.Printf(" Alloc = %v M", bToMb(m.Alloc)) + fmt.Printf(" Sys = %v M", bToMb(m.Sys)) + fmt.Printf(" Loop = %.2fM", float64(atomic.LoadUint32(&loop))/1e6) + fmt.Printf(" NumGC = %v\n", m.NumGC) + } + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() - opt := badger.LSMOnlyOptions - opt.Dir = dir - opt.ValueDir = dir + for range ticker.C { + printMemory() + } + }() - db, err := badger.Open(opt) - x.Check(err) - defer db.Close() + conn, err := x.SetupConnection("localhost:5080", nil, false) + require.NoError(t, err) + require.NotNil(t, conn) + + xidmap := New(conn, nil) + start := time.Now() + var wg sync.WaitGroup + for numGo := 0; numGo < 32; numGo++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + i := atomic.AddUint32(&loop, 1) + if i > 50e6 { + return + } + xidmap.AssignUid(fmt.Sprintf("xid-%d", i)) + } + }() + } + wg.Wait() + t.Logf("Time taken: %v", time.Since(start).Round(time.Millisecond)) +} + +func BenchmarkXidmap(b *testing.B) { conn, err := x.SetupConnection("localhost:5080", nil, false) x.Check(err) x.AssertTrue(conn != nil) var counter uint64 - xidmap := New(db, conn) + xidmap := New(conn, nil) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { From b85b81def7ab62eff26291ff4dea9b12998ad8c6 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sat, 9 Feb 2019 19:23:42 -0800 Subject: [PATCH 04/11] Working live loader, which can optionally just keep all the mapping in memory. --- dgraph/cmd/bulk/loader.go | 3 ++- dgraph/cmd/live/batch.go | 2 +- dgraph/cmd/live/run.go | 42 +++++++++++++++++++-------------------- xidmap/xidmap.go | 7 +++++-- xidmap/xidmap_test.go | 2 +- 5 files changed, 29 insertions(+), 27 deletions(-) diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 711748d08d1..e65ea7a9b14 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -147,6 +147,7 @@ func readSchema(filename string) []*pb.SchemaUpdate { func (ld *loader) mapStage() { ld.prog.setPhase(mapPhase) + // TODO: Consider if we need to always store the XIDs in Badger. Things slow down if we do. xidDir := filepath.Join(ld.opt.TmpDir, "xids") x.Check(os.Mkdir(xidDir, 0755)) opt := badger.DefaultOptions @@ -157,7 +158,7 @@ func (ld *loader) mapStage() { var err error ld.xidDB, err = badger.Open(opt) x.Check(err) - ld.xids = xidmap.New(ld.xidDB, ld.zero) + ld.xids = xidmap.New(ld.zero, ld.xidDB) var dir, ext string var loaderType int diff --git a/dgraph/cmd/live/batch.go b/dgraph/cmd/live/batch.go index 5ef92ba9209..7f73d3dbb61 100644 --- a/dgraph/cmd/live/batch.go +++ b/dgraph/cmd/live/batch.go @@ -63,7 +63,7 @@ type loader struct { dc *dgo.Dgraph alloc *xidmap.XidMap ticker *time.Ticker - kv *badger.DB + db *badger.DB requestsWg sync.WaitGroup // If we retry a request, we add one to retryRequestsWg. retryRequestsWg sync.WaitGroup diff --git a/dgraph/cmd/live/run.go b/dgraph/cmd/live/run.go index 5e9965aabe5..3ab6c831ef0 100644 --- a/dgraph/cmd/live/run.go +++ b/dgraph/cmd/live/run.go @@ -159,6 +159,7 @@ func (l *loader) uid(val string) string { // a user selecting an unassigned UID in this way - it may be assigned // later to another node. It is up to the user to avoid this. if uid, err := strconv.ParseUint(val, 0, 64); err == nil { + l.alloc.BumpTo(uid) return fmt.Sprintf("%#x", uid) } @@ -239,32 +240,32 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk } func setup(opts batchMutationOptions, dc *dgo.Dgraph) *loader { - x.Check(os.MkdirAll(opt.clientDir, 0700)) - o := badger.DefaultOptions - o.SyncWrites = true // So that checkpoints are persisted immediately. - o.TableLoadingMode = bopt.MemoryMap - o.Dir = opt.clientDir - o.ValueDir = opt.clientDir + var db *badger.DB + if len(opt.clientDir) > 0 { + x.Check(os.MkdirAll(opt.clientDir, 0700)) + o := badger.DefaultOptions + o.SyncWrites = true // So that checkpoints are persisted immediately. + o.TableLoadingMode = bopt.MemoryMap + o.Dir = opt.clientDir + o.ValueDir = opt.clientDir - kv, err := badger.Open(o) - x.Checkf(err, "Error while creating badger KV posting store") + var err error + db, err = badger.Open(o) + x.Checkf(err, "Error while creating badger KV posting store") + } // compression with zero server actually makes things worse connzero, err := x.SetupConnection(opt.zero, &tlsConf, false) x.Checkf(err, "Unable to connect to zero, Is it running at %s?", opt.zero) - alloc := xidmap.New( - kv, - connzero, - ) - + alloc := xidmap.New(connzero, db) l := &loader{ opts: opts, dc: dc, start: time.Now(), reqs: make(chan api.Mutation, opts.Pending*2), alloc: alloc, - kv: kv, + db: db, zeroconn: connzero, } @@ -316,16 +317,9 @@ func run() error { } dgraphClient := dgo.NewDgraphClient(clients...) - if len(opt.clientDir) == 0 { - var err error - opt.clientDir, err = ioutil.TempDir("", "x") - x.Checkf(err, "Error while trying to create temporary client directory.") - fmt.Printf("Creating temp client directory at %s\n", opt.clientDir) - defer os.RemoveAll(opt.clientDir) - } l := setup(bmOpts, dgraphClient) defer l.zeroconn.Close() - defer l.kv.Close() + defer l.db.Close() if len(opt.schemaFile) > 0 { if err := processSchemaFile(ctx, opt.schemaFile, dgraphClient); err != nil { @@ -390,5 +384,9 @@ func run() error { fmt.Printf("Time spent : %v\n", c.Elapsed) fmt.Printf("N-Quads processed per second : %d\n", rate) + if l.db != nil { + l.alloc.Flush() + l.db.Close() + } return nil } diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index e202d93a99b..ec2555e669e 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -153,8 +153,8 @@ func (m *XidMap) AssignUid(xid string) uint64 { return newUid } -// BumpUp can be used to make Zero allocate UIDs up to this given number. -func (m *XidMap) BumpUp(uid uint64) { +// BumpTo can be used to make Zero allocate UIDs up to this given number. +func (m *XidMap) BumpTo(uid uint64) { m.RLock() end := m.block.end m.RUnlock() @@ -192,5 +192,8 @@ func (m *XidMap) AllocateUid() uint64 { // Flush must be called if DB is provided to XidMap. func (m *XidMap) Flush() error { + if m.writer == nil { + return nil + } return m.writer.Flush() } diff --git a/xidmap/xidmap_test.go b/xidmap/xidmap_test.go index 1e139f1e79e..b0d7b0035d8 100644 --- a/xidmap/xidmap_test.go +++ b/xidmap/xidmap_test.go @@ -48,7 +48,7 @@ func TestXidmap(t *testing.T) { require.Equal(t, uidb, xidmap.AssignUid("b")) to := xidmap.block.end + 1e6 + 3 - xidmap.BumpUp(to) + xidmap.BumpTo(to) uid := xidmap.AllocateUid() t.Logf("bump up to: %d. allocated: %d", to, uid) require.True(t, uid > to, "to: %d. Got: %d", to, uid) From 90d7154cddf28da9ae3953c7706df0faab6967c3 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sat, 9 Feb 2019 19:54:55 -0800 Subject: [PATCH 05/11] Changes found while running --- dgraph/cmd/bulk/loader.go | 2 +- dgraph/cmd/live/run.go | 5 ++--- worker/mutation.go | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index e65ea7a9b14..90a6f18e9e3 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -224,7 +224,7 @@ func (ld *loader) mapStage() { for i := range ld.mappers { ld.mappers[i] = nil } - // ld.xids.EvictAll() + x.Check(ld.xids.Flush()) x.Check(ld.xidDB.Close()) ld.xids = nil runtime.GC() diff --git a/dgraph/cmd/live/run.go b/dgraph/cmd/live/run.go index 3ab6c831ef0..081a52d0e54 100644 --- a/dgraph/cmd/live/run.go +++ b/dgraph/cmd/live/run.go @@ -244,10 +244,10 @@ func setup(opts batchMutationOptions, dc *dgo.Dgraph) *loader { if len(opt.clientDir) > 0 { x.Check(os.MkdirAll(opt.clientDir, 0700)) o := badger.DefaultOptions - o.SyncWrites = true // So that checkpoints are persisted immediately. - o.TableLoadingMode = bopt.MemoryMap o.Dir = opt.clientDir o.ValueDir = opt.clientDir + o.TableLoadingMode = bopt.MemoryMap + o.SyncWrites = false var err error db, err = badger.Open(o) @@ -319,7 +319,6 @@ func run() error { l := setup(bmOpts, dgraphClient) defer l.zeroconn.Close() - defer l.db.Close() if len(opt.schemaFile) > 0 { if err := processSchemaFile(ctx, opt.schemaFile, dgraphClient); err != nil { diff --git a/worker/mutation.go b/worker/mutation.go index 362f55e2d83..2900fa33fb4 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -290,7 +290,7 @@ func ValidateAndConvert(edge *pb.DirectedEdge, su *pb.SchemaUpdate) error { return x.Errorf("Input for predicate %s of type uid is scalar", edge.Attr) case schemaType.IsScalar() && !storageType.IsScalar(): - return x.Errorf("Input for predicate %s of type scalar is uid", edge.Attr) + return x.Errorf("Input for predicate %s of type scalar is uid. Edge: %v", edge.Attr, edge) // The suggested storage type matches the schema, OK! case storageType == schemaType && schemaType != types.DefaultID: From af490906ce7131bb597081c7681a027e134bb113 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sat, 9 Feb 2019 20:30:46 -0800 Subject: [PATCH 06/11] Adding shards back to XidMap speed up operations by a huge factor. Benchmark shows each allocation is 300ns. --- xidmap/xidmap.go | 94 +++++++++++++++++++++++++------------------ xidmap/xidmap_test.go | 2 +- 2 files changed, 55 insertions(+), 41 deletions(-) diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index ec2555e669e..7e89f4e0f22 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -19,6 +19,8 @@ package xidmap import ( "context" "encoding/binary" + "math/rand" + "runtime" "sync" "time" @@ -27,6 +29,7 @@ import ( "github.com/dgraph-io/badger" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + farm "github.com/dgryski/go-farm" "github.com/golang/glog" ) @@ -34,15 +37,20 @@ import ( // manner. It's memory friendly because the mapping is stored on disk, but fast // because it uses an LRU cache. type XidMap struct { - sync.RWMutex // protects uidMap and block. - newRanges chan *pb.AssignedIds - uidMap map[string]uint64 - block *block + shards []*shard + newRanges chan *pb.AssignedIds // Optionally, these can be set to persist the mappings. writer *badger.WriteBatch } +type shard struct { + sync.RWMutex + block + + uidMap map[string]uint64 +} + type block struct { start, end uint64 } @@ -65,8 +73,12 @@ func (b *block) assign(ch <-chan *pb.AssignedIds) uint64 { func New(zero *grpc.ClientConn, db *badger.DB) *XidMap { xm := &XidMap{ newRanges: make(chan *pb.AssignedIds, 10), - block: new(block), - uidMap: make(map[string]uint64), + shards: make([]*shard, runtime.GOMAXPROCS(0)), + } + for i := range xm.shards { + xm.shards[i] = &shard{ + uidMap: make(map[string]uint64), + } } if db != nil { // If DB is provided, let's load up all the xid -> uid mappings in memory. @@ -81,9 +93,11 @@ func New(zero *grpc.ClientConn, db *badger.DB) *XidMap { for itr.Rewind(); itr.Valid(); itr.Next() { item := itr.Item() key := string(item.Key()) + sh := xm.shardFor(key) err := item.Value(func(val []byte) error { uid := binary.BigEndian.Uint64(val) - xm.uidMap[key] = uid + // No need to acquire a lock. This is all serial access. + sh.uidMap[key] = uid return nil }) if err != nil { @@ -123,25 +137,32 @@ func New(zero *grpc.ClientConn, db *badger.DB) *XidMap { return xm } +func (m *XidMap) shardFor(xid string) *shard { + fp := farm.Fingerprint32([]byte(xid)) + idx := fp % uint32(len(m.shards)) + return m.shards[idx] +} + // AssignUid creates new or looks up existing XID to UID mappings. func (m *XidMap) AssignUid(xid string) uint64 { - m.RLock() - uid := m.uidMap[xid] - m.RUnlock() + sh := m.shardFor(xid) + sh.RLock() + uid := sh.uidMap[xid] + sh.RUnlock() if uid > 0 { return uid } - m.Lock() - defer m.Unlock() + sh.Lock() + defer sh.Unlock() - uid = m.uidMap[xid] + uid = sh.uidMap[xid] if uid > 0 { return uid } - newUid := m.block.assign(m.newRanges) - m.uidMap[xid] = newUid + newUid := sh.assign(m.newRanges) + sh.uidMap[xid] = newUid if m.writer != nil { var uidBuf [8]byte @@ -153,41 +174,34 @@ func (m *XidMap) AssignUid(xid string) uint64 { return newUid } -// BumpTo can be used to make Zero allocate UIDs up to this given number. -func (m *XidMap) BumpTo(uid uint64) { - m.RLock() - end := m.block.end - m.RUnlock() - if uid <= end { - return - } - - m.Lock() - defer m.Unlock() +func (sh *shard) End() uint64 { + sh.RLock() + defer sh.RUnlock() + return sh.end +} - b := m.block - for { - if uid < b.start { - return - } - if uid == b.start { - b.start++ +// BumpTo can be used to make Zero allocate UIDs up to this given number. No guarantees are made +// about the next UID allocated by XidMap. It can be lower or higher than the uid given here. +func (m *XidMap) BumpTo(uid uint64) { + for _, sh := range m.shards { + if uid <= sh.End() { return } - if uid < b.end { - b.start = uid + 1 + } + for { + r := <-m.newRanges + if uid <= r.EndId { return } - newRange := <-m.newRanges - b.start, b.end = newRange.StartId, newRange.EndId } } // AllocateUid gives a single uid without creating an xid to uid mapping. func (m *XidMap) AllocateUid() uint64 { - m.Lock() - defer m.Unlock() - return m.block.assign(m.newRanges) + sh := m.shards[rand.Intn(len(m.shards))] + sh.Lock() + defer sh.Unlock() + return sh.assign(m.newRanges) } // Flush must be called if DB is provided to XidMap. diff --git a/xidmap/xidmap_test.go b/xidmap/xidmap_test.go index b0d7b0035d8..bb2dc4169a9 100644 --- a/xidmap/xidmap_test.go +++ b/xidmap/xidmap_test.go @@ -47,7 +47,7 @@ func TestXidmap(t *testing.T) { require.True(t, uida != uidb) require.Equal(t, uidb, xidmap.AssignUid("b")) - to := xidmap.block.end + 1e6 + 3 + to := xidmap.AllocateUid() + uint64(1e6+3) xidmap.BumpTo(to) uid := xidmap.AllocateUid() t.Logf("bump up to: %d. allocated: %d", to, uid) From a698ef586641f9deae93cc4dfeb21603dc1594bc Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sat, 9 Feb 2019 21:08:31 -0800 Subject: [PATCH 07/11] Make BumpTo much faster by calling Zero directly, instead of looping through the newRanges channel. --- xidmap/xidmap.go | 33 ++++++++++++++++++++++++++------- xidmap/xidmap_test.go | 3 +-- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index 7e89f4e0f22..fafe0a21fc6 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -39,6 +39,7 @@ import ( type XidMap struct { shards []*shard newRanges chan *pb.AssignedIds + zc pb.ZeroClient // Optionally, these can be set to persist the mappings. writer *badger.WriteBatch @@ -110,15 +111,15 @@ func New(zero *grpc.ClientConn, db *badger.DB) *XidMap { }) x.Check(err) } + xm.zc = pb.NewZeroClient(zero) go func() { - zc := pb.NewZeroClient(zero) const initBackoff = 10 * time.Millisecond const maxBackoff = 5 * time.Second backoff := initBackoff for { ctx, cancel := context.WithTimeout(context.Background(), time.Second) - assigned, err := zc.AssignUids(ctx, &pb.Num{Val: 1e5}) + assigned, err := xm.zc.AssignUids(ctx, &pb.Num{Val: 1e5}) glog.V(1).Infof("Assigned Uids: %+v. Err: %v", assigned, err) cancel() if err == nil { @@ -174,25 +175,43 @@ func (m *XidMap) AssignUid(xid string) uint64 { return newUid } -func (sh *shard) End() uint64 { +func (sh *shard) Current() uint64 { sh.RLock() defer sh.RUnlock() - return sh.end + return sh.start } -// BumpTo can be used to make Zero allocate UIDs up to this given number. No guarantees are made -// about the next UID allocated by XidMap. It can be lower or higher than the uid given here. +// BumpTo can be used to make Zero allocate UIDs up to this given number. Attempts are made to +// ensure all future allocations of UIDs be higher than this one, but result is not guaranteed. func (m *XidMap) BumpTo(uid uint64) { for _, sh := range m.shards { - if uid <= sh.End() { + if uid <= sh.Current() { return } } + defer func() { + for _, sh := range m.shards { + sh.Lock() + sh.end = 0 + sh.Unlock() + } + }() + for { r := <-m.newRanges if uid <= r.EndId { return } + num := uid - r.EndId + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + assigned, err := m.zc.AssignUids(ctx, &pb.Num{Val: num}) + cancel() + if err == nil { + glog.V(1).Infof("Requested bump: %d. Discarding assigned: %v", uid, assigned) + return + } else { + glog.Errorf("While requesting AssignUids(%d): %v", num, err) + } } } diff --git a/xidmap/xidmap_test.go b/xidmap/xidmap_test.go index bb2dc4169a9..c70e3db88f7 100644 --- a/xidmap/xidmap_test.go +++ b/xidmap/xidmap_test.go @@ -49,9 +49,8 @@ func TestXidmap(t *testing.T) { to := xidmap.AllocateUid() + uint64(1e6+3) xidmap.BumpTo(to) - uid := xidmap.AllocateUid() + uid := xidmap.AllocateUid() // Does not have to be above the bump. t.Logf("bump up to: %d. allocated: %d", to, uid) - require.True(t, uid > to, "to: %d. Got: %d", to, uid) require.NoError(t, xidmap.Flush()) xidmap = nil From 21feaa2e3fb910afa4eabb29b180200d5f46f769 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sat, 9 Feb 2019 21:43:50 -0800 Subject: [PATCH 08/11] Reassign all shards after bumping up to a UID. --- dgraph/cmd/live/batch.go | 9 ++++++--- xidmap/xidmap.go | 18 +++++++++++++----- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/dgraph/cmd/live/batch.go b/dgraph/cmd/live/batch.go index 7f73d3dbb61..a440c39cd71 100644 --- a/dgraph/cmd/live/batch.go +++ b/dgraph/cmd/live/batch.go @@ -188,15 +188,18 @@ func (l *loader) makeRequests() { } func (l *loader) printCounters() { - l.ticker = time.NewTicker(2 * time.Second) + period := 5 * time.Second + l.ticker = time.NewTicker(period) start := time.Now() + var last Counter for range l.ticker.C { counter := l.Counter() - rate := float64(counter.Nquads) / counter.Elapsed.Seconds() + rate := float64(counter.Nquads-last.Nquads) / period.Seconds() elapsed := time.Since(start).Round(time.Second) - fmt.Printf("[%6s] Txns: %d N-Quads: %d N-Quads/sec: %5.0f Aborts: %d\n", + fmt.Printf("[%6s] Txns: %d N-Quads: %d N-Quads/s [last 5s]: %5.0f Aborts: %d\n", elapsed, counter.TxnsDone, counter.Nquads, rate, counter.Aborts) + last = counter } } diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index fafe0a21fc6..b2a20b833d2 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -20,7 +20,6 @@ import ( "context" "encoding/binary" "math/rand" - "runtime" "sync" "time" @@ -72,9 +71,10 @@ func (b *block) assign(ch <-chan *pb.AssignedIds) uint64 { // badger.DB can be provided to persist the xid to uid allocations. This would add latency to the // assignment operations. func New(zero *grpc.ClientConn, db *badger.DB) *XidMap { + numShards := 32 xm := &XidMap{ - newRanges: make(chan *pb.AssignedIds, 10), - shards: make([]*shard, runtime.GOMAXPROCS(0)), + newRanges: make(chan *pb.AssignedIds, numShards), + shards: make([]*shard, numShards), } for i := range xm.shards { xm.shards[i] = &shard{ @@ -119,7 +119,7 @@ func New(zero *grpc.ClientConn, db *badger.DB) *XidMap { backoff := initBackoff for { ctx, cancel := context.WithTimeout(context.Background(), time.Second) - assigned, err := xm.zc.AssignUids(ctx, &pb.Num{Val: 1e5}) + assigned, err := xm.zc.AssignUids(ctx, &pb.Num{Val: 1e4}) glog.V(1).Infof("Assigned Uids: %+v. Err: %v", assigned, err) cancel() if err == nil { @@ -190,14 +190,17 @@ func (m *XidMap) BumpTo(uid uint64) { } } defer func() { + // Reset all the shards. for _, sh := range m.shards { sh.Lock() sh.end = 0 + sh.assign(m.newRanges) sh.Unlock() } }() for { + glog.V(1).Infof("Bumping up to %v", uid) r := <-m.newRanges if uid <= r.EndId { return @@ -207,7 +210,12 @@ func (m *XidMap) BumpTo(uid uint64) { assigned, err := m.zc.AssignUids(ctx, &pb.Num{Val: num}) cancel() if err == nil { - glog.V(1).Infof("Requested bump: %d. Discarding assigned: %v", uid, assigned) + glog.V(1).Infof("Requested bump: %d. Got assigned: %v", uid, assigned) + if assigned.EndId-uid >= 1e4 { + assigned.StartId = uid + 1 + go func() { m.newRanges <- assigned }() + glog.V(1).Infof("Reusing assigned as: %v", assigned) + } return } else { glog.Errorf("While requesting AssignUids(%d): %v", num, err) From 0c3a23308330b438263663fe592ecfad59cca9c2 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sat, 9 Feb 2019 22:08:02 -0800 Subject: [PATCH 09/11] Improve how BumpTo() happens by using a maxSeenUid variable. --- xidmap/xidmap.go | 47 +++++++++++++++++++++-------------------------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index b2a20b833d2..dc537fab944 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "math/rand" "sync" + "sync/atomic" "time" "google.golang.org/grpc" @@ -36,9 +37,10 @@ import ( // manner. It's memory friendly because the mapping is stored on disk, but fast // because it uses an LRU cache. type XidMap struct { - shards []*shard - newRanges chan *pb.AssignedIds - zc pb.ZeroClient + shards []*shard + newRanges chan *pb.AssignedIds + zc pb.ZeroClient + maxUidSeen uint64 // Optionally, these can be set to persist the mappings. writer *badger.WriteBatch @@ -123,6 +125,7 @@ func New(zero *grpc.ClientConn, db *badger.DB) *XidMap { glog.V(1).Infof("Assigned Uids: %+v. Err: %v", assigned, err) cancel() if err == nil { + xm.updateMaxSeen(assigned.EndId) backoff = initBackoff xm.newRanges <- assigned continue @@ -181,41 +184,33 @@ func (sh *shard) Current() uint64 { return sh.start } +func (m *XidMap) updateMaxSeen(max uint64) { + for { + prev := atomic.LoadUint64(&m.maxUidSeen) + if prev >= max { + return + } + atomic.CompareAndSwapUint64(&m.maxUidSeen, prev, max) + } +} + // BumpTo can be used to make Zero allocate UIDs up to this given number. Attempts are made to // ensure all future allocations of UIDs be higher than this one, but result is not guaranteed. func (m *XidMap) BumpTo(uid uint64) { - for _, sh := range m.shards { - if uid <= sh.Current() { - return - } + curMax := atomic.LoadUint64(&m.maxUidSeen) + if uid <= curMax { + return } - defer func() { - // Reset all the shards. - for _, sh := range m.shards { - sh.Lock() - sh.end = 0 - sh.assign(m.newRanges) - sh.Unlock() - } - }() for { glog.V(1).Infof("Bumping up to %v", uid) - r := <-m.newRanges - if uid <= r.EndId { - return - } - num := uid - r.EndId + num := x.Max(uid-curMax, 1e4) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) assigned, err := m.zc.AssignUids(ctx, &pb.Num{Val: num}) cancel() if err == nil { glog.V(1).Infof("Requested bump: %d. Got assigned: %v", uid, assigned) - if assigned.EndId-uid >= 1e4 { - assigned.StartId = uid + 1 - go func() { m.newRanges <- assigned }() - glog.V(1).Infof("Reusing assigned as: %v", assigned) - } + m.updateMaxSeen(assigned.EndId) return } else { glog.Errorf("While requesting AssignUids(%d): %v", num, err) From 071826d03514d3080fae0a7a7038e9fcec7d47a0 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 11 Feb 2019 14:11:18 -0800 Subject: [PATCH 10/11] Martin's review --- xidmap/xidmap.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index dc537fab944..351dcbd1d55 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -57,7 +57,7 @@ type block struct { start, end uint64 } -// This must already have a write lock. +// assign assumes the write lock is already acquired. func (b *block) assign(ch <-chan *pb.AssignedIds) uint64 { if b.end == 0 || b.start > b.end { newRange := <-ch @@ -195,7 +195,7 @@ func (m *XidMap) updateMaxSeen(max uint64) { } // BumpTo can be used to make Zero allocate UIDs up to this given number. Attempts are made to -// ensure all future allocations of UIDs be higher than this one, but result is not guaranteed. +// ensure all future allocations of UIDs be higher than this one, but results are not guaranteed. func (m *XidMap) BumpTo(uid uint64) { curMax := atomic.LoadUint64(&m.maxUidSeen) if uid <= curMax { From 7825041ea0ec64ab9e0d43de469b6c0830fb1fe7 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 11 Feb 2019 15:45:27 -0800 Subject: [PATCH 11/11] Martin review --- xidmap/xidmap.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index 351dcbd1d55..8ebb67524be 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -125,8 +125,8 @@ func New(zero *grpc.ClientConn, db *badger.DB) *XidMap { glog.V(1).Infof("Assigned Uids: %+v. Err: %v", assigned, err) cancel() if err == nil { - xm.updateMaxSeen(assigned.EndId) backoff = initBackoff + xm.updateMaxSeen(assigned.EndId) xm.newRanges <- assigned continue }