From 97d58414047c5340f2ce71ece6eb488d77236abd Mon Sep 17 00:00:00 2001 From: Ahsan Barkati Date: Tue, 17 Aug 2021 01:31:08 +0530 Subject: [PATCH] opt(recurse): Optimise recurse and bring range iterators from sroar (#7989) Optimise recurse by using reached map with bitmap as the map value, so a direct bitmap `AndNot` can be done to get the list of unexplored edges. Also, use range iterators and the fast iterator. A bunch of other small optimisations. Co-authored-by: Manish R Jain --- algo/uidlist.go | 2 +- codec/codec.go | 53 +++++++++++-------- dgraph/cmd/bulk/count_index.go | 3 +- dgraph/cmd/bulk/reduce.go | 3 +- go.mod | 5 +- go.sum | 11 ++-- posting/list.go | 18 +++---- posting/mvcc.go | 6 +-- query/groupby.go | 2 +- query/query.go | 16 +++--- query/recurse.go | 48 +++++++++++++----- worker/sort.go | 5 +- worker/task.go | 93 ++++++++++++---------------------- worker/trigram.go | 3 +- 14 files changed, 137 insertions(+), 131 deletions(-) diff --git a/algo/uidlist.go b/algo/uidlist.go index b38bb319b64..c7027b2eae4 100644 --- a/algo/uidlist.go +++ b/algo/uidlist.go @@ -43,7 +43,7 @@ func ApplyFilter(u *pb.List, f func(uint64, int) bool) { } else { b := sroar.NewBitmap() b.SetMany(out) - u.Bitmap = codec.ToBytes(b) + u.Bitmap = b.ToBuffer() } } diff --git a/codec/codec.go b/codec/codec.go index 1620887c09a..6c9e84b52a7 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -45,7 +45,7 @@ func ApproxLen(bitmap []byte) int { func ToList(rm *sroar.Bitmap) *pb.List { return &pb.List{ - Bitmap: ToBytes(rm), + Bitmap: rm.ToBufferWithCopy(), } } @@ -62,7 +62,7 @@ func ListCardinality(l *pb.List) uint64 { if len(l.SortedUids) > 0 { return uint64(len(l.SortedUids)) } - b := FromList(l) + b := FromListNoCopy(l) return uint64(b.GetCardinality()) } @@ -88,7 +88,7 @@ func SetUids(l *pb.List, uids []uint64) { } else { r := sroar.NewBitmap() r.SetMany(uids) - l.Bitmap = ToBytes(r) + l.Bitmap = r.ToBuffer() } } @@ -132,35 +132,46 @@ func Merge(matrix []*pb.List) *sroar.Bitmap { if len(matrix) == 0 { return out } - out.Or(FromList(matrix[0])) - for _, l := range matrix[1:] { - r := FromList(l) - out.Or(r) - } - return out -} -func ToBytes(bm *sroar.Bitmap) []byte { - if bm.IsEmpty() { - return nil + var bms []*sroar.Bitmap + for _, m := range matrix { + if bmc := FromListNoCopy(m); bmc != nil { + bms = append(bms, bmc) + } } - // TODO: We should not use ToBufferWithCopy always. - return bm.ToBufferWithCopy() + return sroar.FastOr(bms...) } func FromList(l *pb.List) *sroar.Bitmap { - iw := sroar.NewBitmap() if l == nil { - return iw + return nil + } + // Keep the check for bitmap before sortedUids because we expect to have bitmap very often + if len(l.Bitmap) > 0 { + return sroar.FromBufferWithCopy(l.Bitmap) } if len(l.SortedUids) > 0 { - iw.SetMany(l.SortedUids) + bm := sroar.NewBitmap() + bm.SetMany(l.SortedUids) + return bm } + return sroar.NewBitmap() +} + +func FromListNoCopy(l *pb.List) *sroar.Bitmap { + if l == nil { + return nil + } + // Keep the check for bitmap before sortedUids because we expect to have bitmap very often if len(l.Bitmap) > 0 { - // TODO: We should not use FromBufferWithCopy always. - iw = sroar.FromBufferWithCopy(l.Bitmap) + return sroar.FromBuffer(l.Bitmap) + } + if len(l.SortedUids) > 0 { + bm := sroar.NewBitmap() + bm.SetMany(l.SortedUids) + return bm } - return iw + return sroar.NewBitmap() } func FromBytes(buf []byte) *sroar.Bitmap { diff --git a/dgraph/cmd/bulk/count_index.go b/dgraph/cmd/bulk/count_index.go index 8d11d0a82e3..91f54462f35 100644 --- a/dgraph/cmd/bulk/count_index.go +++ b/dgraph/cmd/bulk/count_index.go @@ -24,7 +24,6 @@ import ( "sync/atomic" "github.com/dgraph-io/badger/v3" - "github.com/dgraph-io/dgraph/codec" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -146,7 +145,7 @@ func (c *countIndexer) writeIndex(buf *z.Buffer) { return } - pl.Bitmap = codec.ToBytes(bm) + pl.Bitmap = bm.ToBuffer() kv := posting.MarshalPostingList(&pl, nil) kv.Key = append([]byte{}, lastCe.Key()...) diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 5a9dc6bf979..303bfe59ae1 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -38,7 +38,6 @@ import ( bo "github.com/dgraph-io/badger/v3/options" bpb "github.com/dgraph-io/badger/v3/pb" "github.com/dgraph-io/badger/v3/y" - "github.com/dgraph-io/dgraph/codec" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -615,7 +614,7 @@ func (r *reducer) toList(req *encodeRequest) { // We should not do defer FreePack here, because we might be giving ownership of it away if // we run Rollup. - pl.Bitmap = codec.ToBytes(bm) + pl.Bitmap = bm.ToBuffer() numUids := bm.GetCardinality() atomic.AddInt64(&r.prog.reduceKeyCount, 1) diff --git a/go.mod b/go.mod index 19feaa2aff7..d7b61ee1be6 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 github.com/dgraph-io/ristretto v0.1.0 github.com/dgraph-io/simdjson-go v0.3.0 - github.com/dgraph-io/sroar v0.0.0-20210812083743-986f6c1098e2 + github.com/dgraph-io/sroar v0.0.0-20210816191646-201f86ca72d6 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 @@ -75,7 +75,7 @@ require ( golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744 golang.org/x/text v0.3.6 - golang.org/x/tools v0.1.1 + golang.org/x/tools v0.1.6-0.20210802203754-9b21a8868e16 google.golang.org/api v0.46.0 google.golang.org/genproto v0.0.0-20210510173355-fb37daa5cd7a // indirect google.golang.org/grpc v1.37.1 @@ -83,5 +83,6 @@ require ( gopkg.in/DataDog/dd-trace-go.v1 v1.13.1 // indirect gopkg.in/square/go-jose.v2 v2.3.1 gopkg.in/yaml.v2 v2.2.8 + honnef.co/go/tools v0.2.0 // indirect src.techknowlogick.com/xgo v1.4.1-0.20210311222705-d25c33fcd864 ) diff --git a/go.sum b/go.sum index eb346073610..d7831f99dc5 100644 --- a/go.sum +++ b/go.sum @@ -182,8 +182,8 @@ github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/Lu github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0= github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY= -github.com/dgraph-io/sroar v0.0.0-20210812083743-986f6c1098e2 h1:r6+OcMwALExXQjyoZPBBKjGHTTioBU39J1aagrxgzN4= -github.com/dgraph-io/sroar v0.0.0-20210812083743-986f6c1098e2/go.mod h1:bdNPtQmcxoIQVkZEWZvX0n0/IDlHFab397xdBlP4OoE= +github.com/dgraph-io/sroar v0.0.0-20210816191646-201f86ca72d6 h1:n7NOldILQ6Dy6Z2uaNWb1ZzSO4dj0DxDvyZ2DwXzKdI= +github.com/dgraph-io/sroar v0.0.0-20210816191646-201f86ca72d6/go.mod h1:bdNPtQmcxoIQVkZEWZvX0n0/IDlHFab397xdBlP4OoE= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU= @@ -1007,8 +1007,8 @@ golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= -golang.org/x/tools v0.1.1 h1:wGiQel/hW0NnEkJUk8lbzkX2gFJU6PFxf1v5OlCfuOs= -golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.6-0.20210802203754-9b21a8868e16 h1:ZC/gVBZl8poJyKzWLxxlsmhayVGosF4mohR35szD5Bg= +golang.org/x/tools v0.1.6-0.20210802203754-9b21a8868e16/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1185,8 +1185,9 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +honnef.co/go/tools v0.2.0 h1:ws8AfbgTX3oIczLPNPCu5166oBg9ST2vNs0rcht+mDE= +honnef.co/go/tools v0.2.0/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/posting/list.go b/posting/list.go index 10843d07463..649bccb065d 100644 --- a/posting/list.go +++ b/posting/list.go @@ -605,8 +605,8 @@ func (l *List) iterateAll(readTs uint64, afterUid uint64, f func(obj *pb.Posting advance := func() { next = math.MaxUint64 - if uitr.HasNext() { - next = uitr.Next() + if nx := uitr.Next(); nx > 0 { + next = nx } } advance() @@ -639,8 +639,8 @@ func (l *List) iterateAll(readTs uint64, afterUid uint64, f func(obj *pb.Posting codec.RemoveRange(bm, 0, maxUid) uitr = bm.NewIterator() - for uitr.HasNext() { - p.Uid = uitr.Next() + for u := uitr.Next(); u > 0; u = uitr.Next() { + p.Uid = u f(p) } return nil @@ -1107,7 +1107,7 @@ func (ro *rollupOutput) split(startUid uint64) error { // Remove everything from startUid to uid. nr := r.Clone() nr.RemoveRange(0, uid) // Keep all uids >= uid. - newpl.Bitmap = codec.ToBytes(nr) + newpl.Bitmap = nr.ToBuffer() // Take everything from the first posting where posting.Uid >= uid. idx := sort.Search(len(pl.Postings), func(i int) bool { @@ -1117,7 +1117,7 @@ func (ro *rollupOutput) split(startUid uint64) error { // Update pl as well. Keeps the lower UIDs. codec.RemoveRange(r, uid, math.MaxUint64) - pl.Bitmap = codec.ToBytes(r) + pl.Bitmap = r.ToBuffer() pl.Postings = pl.Postings[:idx] return nil @@ -1169,7 +1169,7 @@ func (l *List) encode(out *rollupOutput, readTs uint64, split bool) error { } plist := &pb.PostingList{} - plist.Bitmap = codec.ToBytes(r) + plist.Bitmap = r.ToBuffer() out.parts[startUid] = plist } @@ -1283,7 +1283,7 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) { // Before this, we were only picking math.Int32 number of uids. // Now we're picking everything. if opt.First == 0 { - out.Bitmap = codec.ToBytes(bm) + out.Bitmap = bm.ToBufferWithCopy() // TODO: Not yet ready to use Bitmap for data transfer. We'd have to deal with all the // places where List.Uids is being called. // out.Bitmap = codec.ToBytes(bm) @@ -1688,7 +1688,7 @@ func FromBackupPostingList(bl *pb.BackupPostingList) *pb.PostingList { } else if len(bl.UidBytes) > 0 { r = codec.FromBackup(bl.UidBytes) } - l.Bitmap = codec.ToBytes(r) + l.Bitmap = r.ToBuffer() l.Postings = bl.Postings l.CommitTs = bl.CommitTs l.Splits = bl.Splits diff --git a/posting/mvcc.go b/posting/mvcc.go index 663d05fa384..e3c33cbf823 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -515,6 +515,9 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { } func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + if pstore.IsClosed() { + return nil, badger.ErrDBClosed + } // TODO: Fix this up later. // cachedVal, ok := lCache.Get(key) // if ok { @@ -537,9 +540,6 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { // } // } - if pstore.IsClosed() { - return nil, badger.ErrDBClosed - } txn := pstore.NewTransactionAt(readTs, false) defer txn.Discard() diff --git a/query/groupby.go b/query/groupby.go index 172c53e2393..ed2d035a6bd 100644 --- a/query/groupby.go +++ b/query/groupby.go @@ -137,7 +137,7 @@ func (d *dedup) addValue(attr string, value types.Val, uid uint64) { } r := codec.FromList(cur.elements[strKey].entities) r.Set(uid) - cur.elements[strKey].entities.Bitmap = codec.ToBytes(r) + cur.elements[strKey].entities.Bitmap = r.ToBuffer() } func aggregateGroup(grp *groupResult, child *SubGraph) (types.Val, error) { diff --git a/query/query.go b/query/query.go index 5ad6985d42b..f22ed0cf3d8 100644 --- a/query/query.go +++ b/query/query.go @@ -804,7 +804,7 @@ func (sg *SubGraph) populate(uids []uint64) error { sort.Slice(uids, func(i, j int) bool { return uids[i] < uids[j] }) r := sroar.NewBitmap() r.SetMany(uids) - sg.uidMatrix = []*pb.List{{Bitmap: codec.ToBytes(r)}} + sg.uidMatrix = []*pb.List{{Bitmap: r.ToBuffer()}} // User specified list may not be sorted. sg.SrcUIDs = &pb.List{SortedUids: uids} return nil @@ -1275,7 +1275,7 @@ func (sg *SubGraph) valueVarAggregation(doneVars map[string]varValue, path []*Su mp := make(map[uint64]types.Val) rangeOver := sg.SrcUIDs if parent == nil { - rangeOver = &pb.List{Bitmap: codec.ToBytes(sg.DestMap)} + rangeOver = &pb.List{Bitmap: sg.DestMap.ToBuffer()} } if rangeOver == nil { it := doneVars[sg.Params.Var] @@ -1433,8 +1433,8 @@ func (sg *SubGraph) populateVarMap(doneVars map[string]varValue, sgPath []*SubGr // Filter out UIDs that don't have atleast one UID in every child. itr := sg.DestMap.NewIterator() out := sg.DestMap.Clone() - for i := 0; itr.HasNext(); i++ { - uid := itr.Next() + uid := itr.Next() + for i := 0; uid > 0; i++ { var exclude bool for _, child := range sg.Children { // For uid we dont actually populate the uidMatrix or values. So a node asking for @@ -1458,6 +1458,7 @@ func (sg *SubGraph) populateVarMap(doneVars map[string]varValue, sgPath []*SubGr if exclude { out.Remove(uid) } + uid = itr.Next() } // Note the we can't overwrite DestUids, as it'd also modify the SrcUids of // next level and the mapping from SrcUids to uidMatrix would be lost. @@ -1775,8 +1776,7 @@ func (sg *SubGraph) fillVars(mp map[string]varValue) error { case (v.Typ == gql.UidVar && sg.SrcFunc != nil && sg.SrcFunc.Name == "uid_in"): srcFuncArgs := sg.SrcFunc.Args[:0] itr := l.UidMap.NewIterator() - for itr.HasNext() { - uid := itr.Next() + for uid := itr.Next(); uid > 0; uid = itr.Next() { // We use base 10 here because the uid parser expects the uid to be in base 10. arg := gql.Arg{Value: strconv.FormatUint(uid, 10)} srcFuncArgs = append(srcFuncArgs, arg) @@ -2469,7 +2469,7 @@ func (sg *SubGraph) applyRandom(ctx context.Context) error { r := sroar.NewBitmap() r.SetMany(uidList[:numRandom]) - sg.uidMatrix[i].Bitmap = codec.ToBytes(r) + sg.uidMatrix[i].Bitmap = r.ToBuffer() } sg.DestMap = codec.Merge(sg.uidMatrix) @@ -2490,7 +2490,7 @@ func (sg *SubGraph) applyPagination(ctx context.Context) error { start, end := x.PageRange(sg.Params.Count, sg.Params.Offset, len(uids)) r := sroar.NewBitmap() r.SetMany(uids[start:end]) - sg.uidMatrix[i].Bitmap = codec.ToBytes(r) + sg.uidMatrix[i].Bitmap = r.ToBuffer() } // Re-merge the UID matrix. sg.DestMap = codec.Merge(sg.uidMatrix) diff --git a/query/recurse.go b/query/recurse.go index d3ac1de80a7..f19e43af132 100644 --- a/query/recurse.go +++ b/query/recurse.go @@ -18,18 +18,19 @@ package query import ( "context" - "fmt" "math" + "strconv" "github.com/dgraph-io/dgraph/algo" "github.com/dgraph-io/dgraph/codec" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/sroar" "github.com/pkg/errors" ) func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error { // Note: Key format is - "attr|fromUID|toUID" - reachMap := make(map[string]struct{}) + reachMap := make(map[string]*sroar.Bitmap) allowLoop := start.Params.RecurseArgs.AllowLoop var numEdges uint64 var exec []*SubGraph @@ -121,21 +122,42 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error for mIdx, fromUID := range codec.GetUids(sg.SrcUIDs) { if allowLoop { + // TODO: This needs to be optimized. for _, ul := range sg.uidMatrix { numEdges += codec.ListCardinality(ul) } } else { - algo.ApplyFilter(sg.uidMatrix[mIdx], func(uid uint64, i int) bool { - key := fmt.Sprintf("%s|%d|%d", sg.Attr, fromUID, uid) - _, seen := reachMap[key] // Combine fromUID here. - if seen { - return false - } - // Mark this edge as taken. We'd disallow this edge later. - reachMap[key] = struct{}{} - numEdges++ - return true - }) + ul := sg.uidMatrix[mIdx] + ur := codec.FromListNoCopy(ul) + if ur == nil { + continue + } + + key := sg.Attr + "|" + strconv.Itoa(int(fromUID)) + prev, ok := reachMap[key] + if !ok { + reachMap[key] = codec.FromList(ul) + continue + } + // Any edges that we have seen before, do not consider + // them again. + if len(sg.uidMatrix[mIdx].SortedUids) > 0 { + // we will have to keep the order, so using ApplyFilter + algo.ApplyFilter(sg.uidMatrix[mIdx], func(uid uint64, i int) bool { + if ur.Contains(uid) { + return false + } + numEdges++ + return true + }) + } else { + ur.AndNot(prev) // This would only keep the UIDs which are NEW. + sg.uidMatrix[mIdx].Bitmap = ur.ToBuffer() + numEdges += uint64(ur.GetCardinality()) + + prev.Or(ur) // Add the new UIDs to our "reach" + reachMap[key] = prev + } } } if len(sg.Params.Order) > 0 || len(sg.Params.FacetsOrder) > 0 { diff --git a/worker/sort.go b/worker/sort.go index 4aca6c2eaab..96da8aebe26 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -413,7 +413,8 @@ func multiSort(ctx context.Context, r *sortresult, ts *pb.SortMessage) error { dsz := int(dest.GetCardinality()) x.AssertTrue(len(result.ValueMatrix) == dsz) itr := dest.NewIterator() - for i := 0; itr.HasNext(); i++ { + uid := itr.Next() + for i := 0; uid > 0; i++ { var sv types.Val if len(result.ValueMatrix[i].Values) == 0 { // Assign nil value which is sorted as greater than all other values. @@ -428,8 +429,8 @@ func multiSort(ctx context.Context, r *sortresult, ts *pb.SortMessage) error { return err } } - uid := itr.Next() sortVals[uid][or.idx] = sv + uid = itr.Next() } } diff --git a/worker/task.go b/worker/task.go index 909c48ee0d5..ad5251d183e 100644 --- a/worker/task.go +++ b/worker/task.go @@ -39,10 +39,10 @@ import ( "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/sroar" "github.com/golang/glog" + "github.com/golang/protobuf/proto" otrace "go.opencensus.io/trace" "golang.org/x/sync/errgroup" - "github.com/golang/protobuf/proto" cindex "github.com/google/codesearch/index" cregexp "github.com/google/codesearch/regexp" "github.com/pkg/errors" @@ -381,30 +381,11 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er outputs := make([]*pb.Result, numGo) listType := schema.State().IsList(q.Attr) - calculate := func(start int) error { - x.AssertTrue(start%width == 0) + calculate := func(idx int, itr *sroar.Iterator) error { out := &pb.Result{} - outputs[start/width] = out - - startNum, err := bm.Select(uint64(start)) - x.Check(err) - - itr := bm.NewIterator() - itr.AdvanceIfNeeded(startNum) - - for count := 0; itr.HasNext(); count++ { - if count >= width { - break - } - if count%100 == 0 { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - } + outputs[idx] = out - uid := itr.Next() + for uid := itr.Next(); uid > 0; uid = itr.Next() { key := x.DataKey(q.Attr, uid) // Get or create the posting list for an entity, attribute combination. @@ -518,17 +499,18 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er // Add an empty UID list to make later processing consistent out.UidMatrix = append(out.UidMatrix, &pb.List{}) default: - out.UidMatrix = append(out.UidMatrix, codec.ToList(res)) + out.UidMatrix = append(out.UidMatrix, &pb.List{Bitmap: res.ToBuffer()}) } } return nil } // End of calculate function. + iters := bm.NewRangeIterators(numGo) var g errgroup.Group for i := 0; i < numGo; i++ { - start := i * width + i := i g.Go(func() error { - return calculate(start) + return calculate(i, iters[i]) }) } if err := g.Wait(); err != nil { @@ -561,16 +543,18 @@ func facetsFilterValuePostingList(args funcArgs, pl *posting.List, facetsTree *f if !pickMultiplePostings { // Retrieve the posting that matches the language preferences. - langMatch, err = pl.PostingFor(q.ReadTs, q.Langs) - if err != nil && err != posting.ErrNoValue { - return err + if len(q.Langs) > 0 { + langMatch, err = pl.PostingFor(q.ReadTs, q.Langs) + if err != nil && err != posting.ErrNoValue { + return err + } } } // TODO(Ashish): This function starts iteration from start(afterUID is always 0). This can be // optimized in come cases. For example when we know lang tag to fetch, we can directly jump // to posting starting with that UID(check list.ValueFor()). - return pl.IterateAll(q.ReadTs, 0, func(p *pb.Posting) error { + return pl.Iterate(q.ReadTs, 0, func(p *pb.Posting) error { if q.ExpandAll { // If q.ExpandAll is true we need to consider all postings irrespective of langs. } else if listType && len(q.Langs) == 0 { @@ -579,8 +563,12 @@ func facetsFilterValuePostingList(args funcArgs, pl *posting.List, facetsTree *f return nil } } else { + // Don't retrieve tagged values unless explicitly asked. + if len(q.Langs) == 0 && len(p.LangTag) > 0 { + return nil + } // Only consider the posting that matches our language preferences. - if !proto.Equal(p, langMatch) { + if len(q.Langs) > 0 && !proto.Equal(p, langMatch) { return nil } } @@ -1187,8 +1175,7 @@ func (qs *queryState) handleRegexFunction(ctx context.Context, arg funcArgs) err filtered := sroar.NewBitmap() itr := uids.NewIterator() - for itr.HasNext() { - uid := itr.Next() + for uid := itr.Next(); uid > 0; uid = itr.Next() { select { case <-ctx.Done(): return ctx.Err() @@ -1229,7 +1216,7 @@ func (qs *queryState) handleRegexFunction(ctx context.Context, arg funcArgs) err } list := &pb.List{ - Bitmap: codec.ToBytes(filtered), + Bitmap: filtered.ToBuffer(), } arg.out.UidMatrix = append(arg.out.UidMatrix, list) return nil @@ -1435,8 +1422,7 @@ func (qs *queryState) handleMatchFunction(ctx context.Context, arg funcArgs) err filtered := sroar.NewBitmap() itr := uids.NewIterator() - for itr.HasNext() { - uid := itr.Next() + for uid := itr.Next(); uid > 0; uid = itr.Next() { select { case <-ctx.Done(): return ctx.Err() @@ -1478,7 +1464,7 @@ func (qs *queryState) handleMatchFunction(ctx context.Context, arg funcArgs) err } out := &pb.List{ - Bitmap: codec.ToBytes(filtered), + Bitmap: filtered.ToBuffer(), } arg.out.UidMatrix = append(arg.out.UidMatrix, out) return nil @@ -1510,20 +1496,12 @@ func (qs *queryState) filterGeoFunction(ctx context.Context, arg funcArgs) error } filtered := make([]*sroar.Bitmap, numGo) - filter := func(idx, start, end int) error { + filter := func(idx int, it *sroar.Iterator) error { filtered[idx] = sroar.NewBitmap() out := filtered[idx] - startUid, err := uids.Select(uint64(start)) - if err != nil { - return err - } - itr := uids.NewIterator() - itr.AdvanceIfNeeded(startUid) - - for uidx := start; uidx < end; uidx++ { - uid := itr.Next() + for uid := it.Next(); uid > 0; uid = it.Next() { pl, err := qs.cache.Get(x.DataKey(attr, uid)) if err != nil { return err @@ -1545,16 +1523,12 @@ func (qs *queryState) filterGeoFunction(ctx context.Context, arg funcArgs) error return nil } + iters := uids.NewRangeIterators(numGo) errCh := make(chan error, numGo) for i := 0; i < numGo; i++ { - start := i * width - end := start + width - if end > numUids { - end = numUids - } - go func(idx, start, end int) { - errCh <- filter(idx, start, end) - }(i, start, end) + go func(idx int, it *sroar.Iterator) { + errCh <- filter(idx, it) + }(i, iters[i]) } for i := 0; i < numGo; i++ { if err := <-errCh; err != nil { @@ -1572,7 +1546,7 @@ func (qs *queryState) filterGeoFunction(ctx context.Context, arg funcArgs) error } for i := 0; i < len(matrix); i++ { matrix[i].And(final) - arg.out.UidMatrix[i].Bitmap = codec.ToBytes(matrix[i]) + arg.out.UidMatrix[i].Bitmap = matrix[i].ToBuffer() } return nil } @@ -1633,8 +1607,7 @@ func (qs *queryState) filterStringFunction(arg funcArgs) error { // We can't directly modify uids bitmap. We need to add them to another bitmap, and then take // the difference. remove := sroar.NewBitmap() - for itr.HasNext() { - uid := itr.Next() + for uid := itr.Next(); uid > 0; uid = itr.Next() { vals, err := qs.getValsForUID(attr, lang, uid, arg.q.ReadTs) switch { case err == posting.ErrNoValue: @@ -1660,7 +1633,7 @@ func (qs *queryState) filterStringFunction(arg funcArgs) error { uids.AndNot(remove) for i := 0; i < len(matrix); i++ { matrix[i].And(uids) - arg.out.UidMatrix[i].Bitmap = codec.ToBytes(matrix[i]) + arg.out.UidMatrix[i].Bitmap = matrix[i].ToBuffer() } return nil } @@ -2510,7 +2483,7 @@ loop: if span != nil { span.Annotatef(nil, "handleHasFunction found %d uids", setCnt) } - result := &pb.List{Bitmap: codec.ToBytes(res)} + result := &pb.List{Bitmap: res.ToBuffer()} out.UidMatrix = append(out.UidMatrix, result) return nil } diff --git a/worker/trigram.go b/worker/trigram.go index dbacd0cbd56..c7f341a54a1 100644 --- a/worker/trigram.go +++ b/worker/trigram.go @@ -21,7 +21,6 @@ import ( cindex "github.com/google/codesearch/index" - "github.com/dgraph-io/dgraph/codec" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/tok" @@ -43,7 +42,7 @@ func uidsForRegex(attr string, arg funcArgs, // TODO: Unnecessary conversion here. Avoid if possible. if !intersect.IsEmpty() { opts.Intersect = &pb.List{ - Bitmap: codec.ToBytes(intersect), + Bitmap: intersect.ToBuffer(), } } else { intersect = sroar.NewBitmap()