diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index a9bce8dcf6b..90a6f18e9e3 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -147,6 +147,7 @@ func readSchema(filename string) []*pb.SchemaUpdate { func (ld *loader) mapStage() { ld.prog.setPhase(mapPhase) + // TODO: Consider if we need to always store the XIDs in Badger. Things slow down if we do. xidDir := filepath.Join(ld.opt.TmpDir, "xids") x.Check(os.Mkdir(xidDir, 0755)) opt := badger.DefaultOptions @@ -157,10 +158,7 @@ func (ld *loader) mapStage() { var err error ld.xidDB, err = badger.Open(opt) x.Check(err) - ld.xids = xidmap.New(ld.xidDB, ld.zero, xidmap.Options{ - NumShards: 1 << 10, - LRUSize: 1 << 19, - }) + ld.xids = xidmap.New(ld.zero, ld.xidDB) var dir, ext string var loaderType int @@ -226,7 +224,7 @@ func (ld *loader) mapStage() { for i := range ld.mappers { ld.mappers[i] = nil } - ld.xids.EvictAll() + x.Check(ld.xids.Flush()) x.Check(ld.xidDB.Close()) ld.xids = nil runtime.GC() diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index ba4b4e53a38..01844388b2f 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -217,8 +217,8 @@ func (m *mapper) processNQuad(nq gql.NQuad) { } func (m *mapper) lookupUid(xid string) uint64 { - uid, isNew := m.xids.AssignUid(xid) - if !isNew || !m.opt.StoreXids { + uid := m.xids.AssignUid(xid) + if !m.opt.StoreXids { return uid } if strings.HasPrefix(xid, "_:") { diff --git a/dgraph/cmd/live/batch.go b/dgraph/cmd/live/batch.go index 5ef92ba9209..a440c39cd71 100644 --- a/dgraph/cmd/live/batch.go +++ b/dgraph/cmd/live/batch.go @@ -63,7 +63,7 @@ type loader struct { dc *dgo.Dgraph alloc *xidmap.XidMap ticker *time.Ticker - kv *badger.DB + db *badger.DB requestsWg sync.WaitGroup // If we retry a request, we add one to retryRequestsWg. retryRequestsWg sync.WaitGroup @@ -188,15 +188,18 @@ func (l *loader) makeRequests() { } func (l *loader) printCounters() { - l.ticker = time.NewTicker(2 * time.Second) + period := 5 * time.Second + l.ticker = time.NewTicker(period) start := time.Now() + var last Counter for range l.ticker.C { counter := l.Counter() - rate := float64(counter.Nquads) / counter.Elapsed.Seconds() + rate := float64(counter.Nquads-last.Nquads) / period.Seconds() elapsed := time.Since(start).Round(time.Second) - fmt.Printf("[%6s] Txns: %d N-Quads: %d N-Quads/sec: %5.0f Aborts: %d\n", + fmt.Printf("[%6s] Txns: %d N-Quads: %d N-Quads/s [last 5s]: %5.0f Aborts: %d\n", elapsed, counter.TxnsDone, counter.Nquads, rate, counter.Aborts) + last = counter } } diff --git a/dgraph/cmd/live/run.go b/dgraph/cmd/live/run.go index 5882e8f298e..081a52d0e54 100644 --- a/dgraph/cmd/live/run.go +++ b/dgraph/cmd/live/run.go @@ -158,13 +158,12 @@ func (l *loader) uid(val string) string { // to be an existing node in the graph. There is limited protection against // a user selecting an unassigned UID in this way - it may be assigned // later to another node. It is up to the user to avoid this. - if strings.HasPrefix(val, "0x") { - if _, err := strconv.ParseUint(val[2:], 16, 64); err == nil { - return val - } + if uid, err := strconv.ParseUint(val, 0, 64); err == nil { + l.alloc.BumpTo(uid) + return fmt.Sprintf("%#x", uid) } - uid, _ := l.alloc.AssignUid(val) + uid := l.alloc.AssignUid(val) return fmt.Sprintf("%#x", uint64(uid)) } @@ -241,36 +240,32 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk } func setup(opts batchMutationOptions, dc *dgo.Dgraph) *loader { - x.Check(os.MkdirAll(opt.clientDir, 0700)) - o := badger.DefaultOptions - o.SyncWrites = true // So that checkpoints are persisted immediately. - o.TableLoadingMode = bopt.MemoryMap - o.Dir = opt.clientDir - o.ValueDir = opt.clientDir + var db *badger.DB + if len(opt.clientDir) > 0 { + x.Check(os.MkdirAll(opt.clientDir, 0700)) + o := badger.DefaultOptions + o.Dir = opt.clientDir + o.ValueDir = opt.clientDir + o.TableLoadingMode = bopt.MemoryMap + o.SyncWrites = false - kv, err := badger.Open(o) - x.Checkf(err, "Error while creating badger KV posting store") + var err error + db, err = badger.Open(o) + x.Checkf(err, "Error while creating badger KV posting store") + } // compression with zero server actually makes things worse connzero, err := x.SetupConnection(opt.zero, &tlsConf, false) x.Checkf(err, "Unable to connect to zero, Is it running at %s?", opt.zero) - alloc := xidmap.New( - kv, - connzero, - xidmap.Options{ - NumShards: 100, - LRUSize: 1e5, - }, - ) - + alloc := xidmap.New(connzero, db) l := &loader{ opts: opts, dc: dc, start: time.Now(), reqs: make(chan api.Mutation, opts.Pending*2), alloc: alloc, - kv: kv, + db: db, zeroconn: connzero, } @@ -322,17 +317,8 @@ func run() error { } dgraphClient := dgo.NewDgraphClient(clients...) - if len(opt.clientDir) == 0 { - var err error - opt.clientDir, err = ioutil.TempDir("", "x") - x.Checkf(err, "Error while trying to create temporary client directory.") - fmt.Printf("Creating temp client directory at %s\n", opt.clientDir) - defer os.RemoveAll(opt.clientDir) - } l := setup(bmOpts, dgraphClient) defer l.zeroconn.Close() - defer l.kv.Close() - defer l.alloc.EvictAll() if len(opt.schemaFile) > 0 { if err := processSchemaFile(ctx, opt.schemaFile, dgraphClient); err != nil { @@ -397,5 +383,9 @@ func run() error { fmt.Printf("Time spent : %v\n", c.Elapsed) fmt.Printf("N-Quads processed per second : %d\n", rate) + if l.db != nil { + l.alloc.Flush() + l.db.Close() + } return nil } diff --git a/worker/mutation.go b/worker/mutation.go index 362f55e2d83..2900fa33fb4 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -290,7 +290,7 @@ func ValidateAndConvert(edge *pb.DirectedEdge, su *pb.SchemaUpdate) error { return x.Errorf("Input for predicate %s of type uid is scalar", edge.Attr) case schemaType.IsScalar() && !storageType.IsScalar(): - return x.Errorf("Input for predicate %s of type scalar is uid", edge.Attr) + return x.Errorf("Input for predicate %s of type scalar is uid. Edge: %v", edge.Attr, edge) // The suggested storage type matches the schema, OK! case storageType == schemaType && schemaType != types.DefaultID: diff --git a/x/x.go b/x/x.go index 945e0bd473e..a0f1d48d517 100644 --- a/x/x.go +++ b/x/x.go @@ -433,7 +433,7 @@ func SetupConnection(host string, tlsConf *TLSHelperConfig, useGz bool) (*grpc.C grpc.WithBlock(), grpc.WithTimeout(10*time.Second)) - if tlsConf.CertRequired { + if tlsConf != nil && tlsConf.CertRequired { tlsConf.ConfigType = TLSClientConfig tlsCfg, _, err := GenerateTLSConfig(*tlsConf) if err != nil { diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index 395bbe8f4a5..8ebb67524be 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -17,10 +17,11 @@ package xidmap import ( - "container/list" "context" "encoding/binary" + "math/rand" "sync" + "sync/atomic" "time" "google.golang.org/grpc" @@ -32,51 +33,31 @@ import ( "github.com/golang/glog" ) -// Options controls the performance characteristics of the XidMap. -type Options struct { - // NumShards controls the number of shards the XidMap is broken into. More - // shards reduces lock contention. - NumShards int - // LRUSize controls the total size of the LRU cache. The LRU is split - // between all shards, so with 4 shards and an LRUSize of 100, each shard - // receives 25 LRU slots. - LRUSize int -} - // XidMap allocates and tracks mappings between Xids and Uids in a threadsafe // manner. It's memory friendly because the mapping is stored on disk, but fast // because it uses an LRU cache. type XidMap struct { - shards []shard - kv *badger.DB - opt Options - newRanges chan *pb.AssignedIds + shards []*shard + newRanges chan *pb.AssignedIds + zc pb.ZeroClient + maxUidSeen uint64 - noMapMu sync.Mutex - noMap block // block for allocating uids without an xid to uid mapping + // Optionally, these can be set to persist the mappings. + writer *badger.WriteBatch } type shard struct { - sync.Mutex + sync.RWMutex block - elems map[string]*list.Element - queue *list.List - beingEvicted map[string]uint64 - - xm *XidMap -} - -type mapping struct { - xid string - uid uint64 - persisted bool + uidMap map[string]uint64 } type block struct { start, end uint64 } +// assign assumes the write lock is already acquired. func (b *block) assign(ch <-chan *pb.AssignedIds) uint64 { if b.end == 0 || b.start > b.end { newRange := <-ch @@ -88,32 +69,64 @@ func (b *block) assign(ch <-chan *pb.AssignedIds) uint64 { return uid } -// New creates an XidMap with given badger and uid provider. -func New(kv *badger.DB, zero *grpc.ClientConn, opt Options) *XidMap { - x.AssertTrue(opt.LRUSize != 0) - x.AssertTrue(opt.NumShards != 0) +// New creates an XidMap. zero conn must be valid for UID allocations to happen. Optionally, a +// badger.DB can be provided to persist the xid to uid allocations. This would add latency to the +// assignment operations. +func New(zero *grpc.ClientConn, db *badger.DB) *XidMap { + numShards := 32 xm := &XidMap{ - shards: make([]shard, opt.NumShards), - kv: kv, - opt: opt, - newRanges: make(chan *pb.AssignedIds), + newRanges: make(chan *pb.AssignedIds, numShards), + shards: make([]*shard, numShards), } for i := range xm.shards { - xm.shards[i].elems = make(map[string]*list.Element) - xm.shards[i].queue = list.New() - xm.shards[i].xm = xm + xm.shards[i] = &shard{ + uidMap: make(map[string]uint64), + } } + if db != nil { + // If DB is provided, let's load up all the xid -> uid mappings in memory. + xm.writer = db.NewWriteBatch() + + err := db.View(func(txn *badger.Txn) error { + var count int + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = false + itr := txn.NewIterator(opt) + defer itr.Close() + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + key := string(item.Key()) + sh := xm.shardFor(key) + err := item.Value(func(val []byte) error { + uid := binary.BigEndian.Uint64(val) + // No need to acquire a lock. This is all serial access. + sh.uidMap[key] = uid + return nil + }) + if err != nil { + return err + } + count++ + } + glog.Infof("Loaded up %d xid to uid mappings", count) + return nil + }) + x.Check(err) + } + xm.zc = pb.NewZeroClient(zero) + go func() { - zc := pb.NewZeroClient(zero) const initBackoff = 10 * time.Millisecond const maxBackoff = 5 * time.Second backoff := initBackoff for { ctx, cancel := context.WithTimeout(context.Background(), time.Second) - assigned, err := zc.AssignUids(ctx, &pb.Num{Val: 10000}) + assigned, err := xm.zc.AssignUids(ctx, &pb.Num{Val: 1e4}) + glog.V(1).Infof("Assigned Uids: %+v. Err: %v", assigned, err) cancel() if err == nil { backoff = initBackoff + xm.updateMaxSeen(assigned.EndId) xm.newRanges <- assigned continue } @@ -124,110 +137,99 @@ func New(kv *badger.DB, zero *grpc.ClientConn, opt Options) *XidMap { } time.Sleep(backoff) } - }() return xm } +func (m *XidMap) shardFor(xid string) *shard { + fp := farm.Fingerprint32([]byte(xid)) + idx := fp % uint32(len(m.shards)) + return m.shards[idx] +} + // AssignUid creates new or looks up existing XID to UID mappings. -func (m *XidMap) AssignUid(xid string) (uid uint64, isNew bool) { - fp := farm.Fingerprint64([]byte(xid)) - idx := fp % uint64(m.opt.NumShards) - sh := &m.shards[idx] +func (m *XidMap) AssignUid(xid string) uint64 { + sh := m.shardFor(xid) + sh.RLock() + uid := sh.uidMap[xid] + sh.RUnlock() + if uid > 0 { + return uid + } sh.Lock() defer sh.Unlock() - var ok bool - uid, ok = sh.lookup(xid) - if ok { - return uid, false + uid = sh.uidMap[xid] + if uid > 0 { + return uid } - x.Check(m.kv.View(func(txn *badger.Txn) error { - item, err := txn.Get([]byte(xid)) - if err == badger.ErrKeyNotFound { - return nil + newUid := sh.assign(m.newRanges) + sh.uidMap[xid] = newUid + + if m.writer != nil { + var uidBuf [8]byte + binary.BigEndian.PutUint64(uidBuf[:], newUid) + if err := m.writer.Set([]byte(xid), uidBuf[:], 0); err != nil { + panic(err) } - x.Check(err) - return item.Value(func(uidBuf []byte) error { - x.AssertTrue(len(uidBuf) > 0) - var n int - uid, n = binary.Uvarint(uidBuf) - x.AssertTrue(n == len(uidBuf)) - ok = true - return nil - }) - })) - if ok { - sh.add(xid, uid, true) - return uid, false } - - uid = sh.assign(m.newRanges) - sh.add(xid, uid, false) - return uid, true + return newUid } -// AllocateUid gives a single uid without creating an xid to uid mapping. -func (m *XidMap) AllocateUid() uint64 { - m.noMapMu.Lock() - defer m.noMapMu.Unlock() - return m.noMap.assign(m.newRanges) +func (sh *shard) Current() uint64 { + sh.RLock() + defer sh.RUnlock() + return sh.start } -func (s *shard) lookup(xid string) (uint64, bool) { - elem, ok := s.elems[xid] - if ok { - s.queue.MoveToBack(elem) - return elem.Value.(*mapping).uid, true - } - if uid, ok := s.beingEvicted[xid]; ok { - s.add(xid, uid, true) - return uid, true +func (m *XidMap) updateMaxSeen(max uint64) { + for { + prev := atomic.LoadUint64(&m.maxUidSeen) + if prev >= max { + return + } + atomic.CompareAndSwapUint64(&m.maxUidSeen, prev, max) } - return 0, false } -func (s *shard) add(xid string, uid uint64, persisted bool) { - lruSizePerShard := s.xm.opt.LRUSize / s.xm.opt.NumShards - if s.queue.Len() >= lruSizePerShard && len(s.beingEvicted) == 0 { - s.evict(0.5) +// BumpTo can be used to make Zero allocate UIDs up to this given number. Attempts are made to +// ensure all future allocations of UIDs be higher than this one, but results are not guaranteed. +func (m *XidMap) BumpTo(uid uint64) { + curMax := atomic.LoadUint64(&m.maxUidSeen) + if uid <= curMax { + return } - m := &mapping{ - xid: xid, - uid: uid, - persisted: persisted, + for { + glog.V(1).Infof("Bumping up to %v", uid) + num := x.Max(uid-curMax, 1e4) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + assigned, err := m.zc.AssignUids(ctx, &pb.Num{Val: num}) + cancel() + if err == nil { + glog.V(1).Infof("Requested bump: %d. Got assigned: %v", uid, assigned) + m.updateMaxSeen(assigned.EndId) + return + } else { + glog.Errorf("While requesting AssignUids(%d): %v", num, err) + } } - elem := s.queue.PushBack(m) - s.elems[xid] = elem } -func (m *XidMap) EvictAll() { - for i := range make([]struct{}, len(m.shards)) { - m.shards[i].Lock() - m.shards[i].evict(1.0) - m.shards[i].Unlock() - } +// AllocateUid gives a single uid without creating an xid to uid mapping. +func (m *XidMap) AllocateUid() uint64 { + sh := m.shards[rand.Intn(len(m.shards))] + sh.Lock() + defer sh.Unlock() + return sh.assign(m.newRanges) } -func (s *shard) evict(ratio float64) { - evict := int(float64(s.queue.Len()) * ratio) - s.beingEvicted = make(map[string]uint64) - txn := s.xm.kv.NewTransaction(true) - defer txn.Discard() - for i := 0; i < evict; i++ { - m := s.queue.Remove(s.queue.Front()).(*mapping) - delete(s.elems, m.xid) - s.beingEvicted[m.xid] = m.uid - if !m.persisted { - var uidBuf [binary.MaxVarintLen64]byte - n := binary.PutUvarint(uidBuf[:], m.uid) - txn.Set([]byte(m.xid), uidBuf[:n]) - } - +// Flush must be called if DB is provided to XidMap. +func (m *XidMap) Flush() error { + if m.writer == nil { + return nil } - x.Check(txn.Commit()) - s.beingEvicted = nil + return m.writer.Flush() } diff --git a/xidmap/xidmap_test.go b/xidmap/xidmap_test.go new file mode 100644 index 00000000000..c70e3db88f7 --- /dev/null +++ b/xidmap/xidmap_test.go @@ -0,0 +1,128 @@ +package xidmap + +import ( + "fmt" + "io/ioutil" + "os" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/dgraph-io/badger" + "github.com/dgraph-io/dgraph/x" + "github.com/stretchr/testify/require" +) + +// Opens a badger db and runs a a test on it. +func withDB(t *testing.T, test func(db *badger.DB)) { + dir, err := ioutil.TempDir(".", "badger-test") + require.NoError(t, err) + defer os.RemoveAll(dir) + + opt := badger.LSMOnlyOptions + opt.Dir = dir + opt.ValueDir = dir + + db, err := badger.Open(opt) + require.NoError(t, err) + defer db.Close() + + test(db) +} + +func TestXidmap(t *testing.T) { + conn, err := x.SetupConnection("localhost:5080", nil, false) + require.NoError(t, err) + require.NotNil(t, conn) + + withDB(t, func(db *badger.DB) { + xidmap := New(conn, db) + + uida := xidmap.AssignUid("a") + require.Equal(t, uida, xidmap.AssignUid("a")) + + uidb := xidmap.AssignUid("b") + require.True(t, uida != uidb) + require.Equal(t, uidb, xidmap.AssignUid("b")) + + to := xidmap.AllocateUid() + uint64(1e6+3) + xidmap.BumpTo(to) + uid := xidmap.AllocateUid() // Does not have to be above the bump. + t.Logf("bump up to: %d. allocated: %d", to, uid) + + require.NoError(t, xidmap.Flush()) + xidmap = nil + + xidmap2 := New(conn, db) + require.Equal(t, uida, xidmap2.AssignUid("a")) + require.Equal(t, uidb, xidmap2.AssignUid("b")) + }) +} + +func TestXidmapMemory(t *testing.T) { + var loop uint32 + bToMb := func(b uint64) uint64 { + return b / 1024 / 1024 + } + printMemory := func() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + // For info on each, see: https://golang.org/pkg/runtime/#MemStats + fmt.Printf(" Heap = %v M", bToMb(m.HeapInuse)) + fmt.Printf(" Alloc = %v M", bToMb(m.Alloc)) + fmt.Printf(" Sys = %v M", bToMb(m.Sys)) + fmt.Printf(" Loop = %.2fM", float64(atomic.LoadUint32(&loop))/1e6) + fmt.Printf(" NumGC = %v\n", m.NumGC) + } + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for range ticker.C { + printMemory() + } + }() + + conn, err := x.SetupConnection("localhost:5080", nil, false) + require.NoError(t, err) + require.NotNil(t, conn) + + xidmap := New(conn, nil) + + start := time.Now() + var wg sync.WaitGroup + for numGo := 0; numGo < 32; numGo++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + i := atomic.AddUint32(&loop, 1) + if i > 50e6 { + return + } + xidmap.AssignUid(fmt.Sprintf("xid-%d", i)) + } + }() + } + wg.Wait() + t.Logf("Time taken: %v", time.Since(start).Round(time.Millisecond)) +} + +func BenchmarkXidmap(b *testing.B) { + conn, err := x.SetupConnection("localhost:5080", nil, false) + x.Check(err) + x.AssertTrue(conn != nil) + + var counter uint64 + xidmap := New(conn, nil) + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + xid := atomic.AddUint64(&counter, 1) + xidmap.AssignUid(fmt.Sprintf("xid-%d", xid)) + } + }) +}