Skip to content

Commit

Permalink
opt(recurse): Optimise recurse and bring range iterators from sroar (#…
Browse files Browse the repository at this point in the history
…7989)

Optimise recurse by using reached map with bitmap as the map value, so a direct
bitmap `AndNot` can be done to get the list of unexplored edges. Also, use
range iterators and the fast iterator. A bunch of other small optimisations.

Co-authored-by: Manish R Jain <[email protected]>
  • Loading branch information
ahsanbarkati and manishrjain authored Aug 16, 2021
1 parent 1966245 commit 97d5841
Show file tree
Hide file tree
Showing 14 changed files with 137 additions and 131 deletions.
2 changes: 1 addition & 1 deletion algo/uidlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func ApplyFilter(u *pb.List, f func(uint64, int) bool) {
} else {
b := sroar.NewBitmap()
b.SetMany(out)
u.Bitmap = codec.ToBytes(b)
u.Bitmap = b.ToBuffer()
}
}

Expand Down
53 changes: 32 additions & 21 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func ApproxLen(bitmap []byte) int {

func ToList(rm *sroar.Bitmap) *pb.List {
return &pb.List{
Bitmap: ToBytes(rm),
Bitmap: rm.ToBufferWithCopy(),
}
}

Expand All @@ -62,7 +62,7 @@ func ListCardinality(l *pb.List) uint64 {
if len(l.SortedUids) > 0 {
return uint64(len(l.SortedUids))
}
b := FromList(l)
b := FromListNoCopy(l)
return uint64(b.GetCardinality())
}

Expand All @@ -88,7 +88,7 @@ func SetUids(l *pb.List, uids []uint64) {
} else {
r := sroar.NewBitmap()
r.SetMany(uids)
l.Bitmap = ToBytes(r)
l.Bitmap = r.ToBuffer()
}
}

Expand Down Expand Up @@ -132,35 +132,46 @@ func Merge(matrix []*pb.List) *sroar.Bitmap {
if len(matrix) == 0 {
return out
}
out.Or(FromList(matrix[0]))
for _, l := range matrix[1:] {
r := FromList(l)
out.Or(r)
}
return out
}

func ToBytes(bm *sroar.Bitmap) []byte {
if bm.IsEmpty() {
return nil
var bms []*sroar.Bitmap
for _, m := range matrix {
if bmc := FromListNoCopy(m); bmc != nil {
bms = append(bms, bmc)
}
}
// TODO: We should not use ToBufferWithCopy always.
return bm.ToBufferWithCopy()
return sroar.FastOr(bms...)
}

func FromList(l *pb.List) *sroar.Bitmap {
iw := sroar.NewBitmap()
if l == nil {
return iw
return nil
}
// Keep the check for bitmap before sortedUids because we expect to have bitmap very often
if len(l.Bitmap) > 0 {
return sroar.FromBufferWithCopy(l.Bitmap)
}
if len(l.SortedUids) > 0 {
iw.SetMany(l.SortedUids)
bm := sroar.NewBitmap()
bm.SetMany(l.SortedUids)
return bm
}
return sroar.NewBitmap()
}

func FromListNoCopy(l *pb.List) *sroar.Bitmap {
if l == nil {
return nil
}
// Keep the check for bitmap before sortedUids because we expect to have bitmap very often
if len(l.Bitmap) > 0 {
// TODO: We should not use FromBufferWithCopy always.
iw = sroar.FromBufferWithCopy(l.Bitmap)
return sroar.FromBuffer(l.Bitmap)
}
if len(l.SortedUids) > 0 {
bm := sroar.NewBitmap()
bm.SetMany(l.SortedUids)
return bm
}
return iw
return sroar.NewBitmap()
}

func FromBytes(buf []byte) *sroar.Bitmap {
Expand Down
3 changes: 1 addition & 2 deletions dgraph/cmd/bulk/count_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"sync/atomic"

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/dgraph/codec"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -146,7 +145,7 @@ func (c *countIndexer) writeIndex(buf *z.Buffer) {
return
}

pl.Bitmap = codec.ToBytes(bm)
pl.Bitmap = bm.ToBuffer()

kv := posting.MarshalPostingList(&pl, nil)
kv.Key = append([]byte{}, lastCe.Key()...)
Expand Down
3 changes: 1 addition & 2 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
bo "github.com/dgraph-io/badger/v3/options"
bpb "github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/badger/v3/y"
"github.com/dgraph-io/dgraph/codec"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -615,7 +614,7 @@ func (r *reducer) toList(req *encodeRequest) {

// We should not do defer FreePack here, because we might be giving ownership of it away if
// we run Rollup.
pl.Bitmap = codec.ToBytes(bm)
pl.Bitmap = bm.ToBuffer()
numUids := bm.GetCardinality()

atomic.AddInt64(&r.prog.reduceKeyCount, 1)
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
github.com/dgraph-io/ristretto v0.1.0
github.com/dgraph-io/simdjson-go v0.3.0
github.com/dgraph-io/sroar v0.0.0-20210812083743-986f6c1098e2
github.com/dgraph-io/sroar v0.0.0-20210816191646-201f86ca72d6
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
Expand Down Expand Up @@ -75,13 +75,14 @@ require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744
golang.org/x/text v0.3.6
golang.org/x/tools v0.1.1
golang.org/x/tools v0.1.6-0.20210802203754-9b21a8868e16
google.golang.org/api v0.46.0
google.golang.org/genproto v0.0.0-20210510173355-fb37daa5cd7a // indirect
google.golang.org/grpc v1.37.1
google.golang.org/grpc/examples v0.0.0-20210518002758-2713b77e8526 // indirect
gopkg.in/DataDog/dd-trace-go.v1 v1.13.1 // indirect
gopkg.in/square/go-jose.v2 v2.3.1
gopkg.in/yaml.v2 v2.2.8
honnef.co/go/tools v0.2.0 // indirect
src.techknowlogick.com/xgo v1.4.1-0.20210311222705-d25c33fcd864
)
11 changes: 6 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/Lu
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY=
github.com/dgraph-io/sroar v0.0.0-20210812083743-986f6c1098e2 h1:r6+OcMwALExXQjyoZPBBKjGHTTioBU39J1aagrxgzN4=
github.com/dgraph-io/sroar v0.0.0-20210812083743-986f6c1098e2/go.mod h1:bdNPtQmcxoIQVkZEWZvX0n0/IDlHFab397xdBlP4OoE=
github.com/dgraph-io/sroar v0.0.0-20210816191646-201f86ca72d6 h1:n7NOldILQ6Dy6Z2uaNWb1ZzSO4dj0DxDvyZ2DwXzKdI=
github.com/dgraph-io/sroar v0.0.0-20210816191646-201f86ca72d6/go.mod h1:bdNPtQmcxoIQVkZEWZvX0n0/IDlHFab397xdBlP4OoE=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
Expand Down Expand Up @@ -1007,8 +1007,8 @@ golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1 h1:wGiQel/hW0NnEkJUk8lbzkX2gFJU6PFxf1v5OlCfuOs=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.6-0.20210802203754-9b21a8868e16 h1:ZC/gVBZl8poJyKzWLxxlsmhayVGosF4mohR35szD5Bg=
golang.org/x/tools v0.1.6-0.20210802203754-9b21a8868e16/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -1185,8 +1185,9 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.2.0 h1:ws8AfbgTX3oIczLPNPCu5166oBg9ST2vNs0rcht+mDE=
honnef.co/go/tools v0.2.0/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
18 changes: 9 additions & 9 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,8 @@ func (l *List) iterateAll(readTs uint64, afterUid uint64, f func(obj *pb.Posting

advance := func() {
next = math.MaxUint64
if uitr.HasNext() {
next = uitr.Next()
if nx := uitr.Next(); nx > 0 {
next = nx
}
}
advance()
Expand Down Expand Up @@ -639,8 +639,8 @@ func (l *List) iterateAll(readTs uint64, afterUid uint64, f func(obj *pb.Posting

codec.RemoveRange(bm, 0, maxUid)
uitr = bm.NewIterator()
for uitr.HasNext() {
p.Uid = uitr.Next()
for u := uitr.Next(); u > 0; u = uitr.Next() {
p.Uid = u
f(p)
}
return nil
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func (ro *rollupOutput) split(startUid uint64) error {
// Remove everything from startUid to uid.
nr := r.Clone()
nr.RemoveRange(0, uid) // Keep all uids >= uid.
newpl.Bitmap = codec.ToBytes(nr)
newpl.Bitmap = nr.ToBuffer()

// Take everything from the first posting where posting.Uid >= uid.
idx := sort.Search(len(pl.Postings), func(i int) bool {
Expand All @@ -1117,7 +1117,7 @@ func (ro *rollupOutput) split(startUid uint64) error {

// Update pl as well. Keeps the lower UIDs.
codec.RemoveRange(r, uid, math.MaxUint64)
pl.Bitmap = codec.ToBytes(r)
pl.Bitmap = r.ToBuffer()
pl.Postings = pl.Postings[:idx]

return nil
Expand Down Expand Up @@ -1169,7 +1169,7 @@ func (l *List) encode(out *rollupOutput, readTs uint64, split bool) error {
}

plist := &pb.PostingList{}
plist.Bitmap = codec.ToBytes(r)
plist.Bitmap = r.ToBuffer()

out.parts[startUid] = plist
}
Expand Down Expand Up @@ -1283,7 +1283,7 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) {
// Before this, we were only picking math.Int32 number of uids.
// Now we're picking everything.
if opt.First == 0 {
out.Bitmap = codec.ToBytes(bm)
out.Bitmap = bm.ToBufferWithCopy()
// TODO: Not yet ready to use Bitmap for data transfer. We'd have to deal with all the
// places where List.Uids is being called.
// out.Bitmap = codec.ToBytes(bm)
Expand Down Expand Up @@ -1688,7 +1688,7 @@ func FromBackupPostingList(bl *pb.BackupPostingList) *pb.PostingList {
} else if len(bl.UidBytes) > 0 {
r = codec.FromBackup(bl.UidBytes)
}
l.Bitmap = codec.ToBytes(r)
l.Bitmap = r.ToBuffer()
l.Postings = bl.Postings
l.CommitTs = bl.CommitTs
l.Splits = bl.Splits
Expand Down
6 changes: 3 additions & 3 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,9 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}

func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
if pstore.IsClosed() {
return nil, badger.ErrDBClosed
}
// TODO: Fix this up later.
// cachedVal, ok := lCache.Get(key)
// if ok {
Expand All @@ -537,9 +540,6 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
// }
// }

if pstore.IsClosed() {
return nil, badger.ErrDBClosed
}
txn := pstore.NewTransactionAt(readTs, false)
defer txn.Discard()

Expand Down
2 changes: 1 addition & 1 deletion query/groupby.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (d *dedup) addValue(attr string, value types.Val, uid uint64) {
}
r := codec.FromList(cur.elements[strKey].entities)
r.Set(uid)
cur.elements[strKey].entities.Bitmap = codec.ToBytes(r)
cur.elements[strKey].entities.Bitmap = r.ToBuffer()
}

func aggregateGroup(grp *groupResult, child *SubGraph) (types.Val, error) {
Expand Down
16 changes: 8 additions & 8 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func (sg *SubGraph) populate(uids []uint64) error {
sort.Slice(uids, func(i, j int) bool { return uids[i] < uids[j] })
r := sroar.NewBitmap()
r.SetMany(uids)
sg.uidMatrix = []*pb.List{{Bitmap: codec.ToBytes(r)}}
sg.uidMatrix = []*pb.List{{Bitmap: r.ToBuffer()}}
// User specified list may not be sorted.
sg.SrcUIDs = &pb.List{SortedUids: uids}
return nil
Expand Down Expand Up @@ -1275,7 +1275,7 @@ func (sg *SubGraph) valueVarAggregation(doneVars map[string]varValue, path []*Su
mp := make(map[uint64]types.Val)
rangeOver := sg.SrcUIDs
if parent == nil {
rangeOver = &pb.List{Bitmap: codec.ToBytes(sg.DestMap)}
rangeOver = &pb.List{Bitmap: sg.DestMap.ToBuffer()}
}
if rangeOver == nil {
it := doneVars[sg.Params.Var]
Expand Down Expand Up @@ -1433,8 +1433,8 @@ func (sg *SubGraph) populateVarMap(doneVars map[string]varValue, sgPath []*SubGr
// Filter out UIDs that don't have atleast one UID in every child.
itr := sg.DestMap.NewIterator()
out := sg.DestMap.Clone()
for i := 0; itr.HasNext(); i++ {
uid := itr.Next()
uid := itr.Next()
for i := 0; uid > 0; i++ {
var exclude bool
for _, child := range sg.Children {
// For uid we dont actually populate the uidMatrix or values. So a node asking for
Expand All @@ -1458,6 +1458,7 @@ func (sg *SubGraph) populateVarMap(doneVars map[string]varValue, sgPath []*SubGr
if exclude {
out.Remove(uid)
}
uid = itr.Next()
}
// Note the we can't overwrite DestUids, as it'd also modify the SrcUids of
// next level and the mapping from SrcUids to uidMatrix would be lost.
Expand Down Expand Up @@ -1775,8 +1776,7 @@ func (sg *SubGraph) fillVars(mp map[string]varValue) error {
case (v.Typ == gql.UidVar && sg.SrcFunc != nil && sg.SrcFunc.Name == "uid_in"):
srcFuncArgs := sg.SrcFunc.Args[:0]
itr := l.UidMap.NewIterator()
for itr.HasNext() {
uid := itr.Next()
for uid := itr.Next(); uid > 0; uid = itr.Next() {
// We use base 10 here because the uid parser expects the uid to be in base 10.
arg := gql.Arg{Value: strconv.FormatUint(uid, 10)}
srcFuncArgs = append(srcFuncArgs, arg)
Expand Down Expand Up @@ -2469,7 +2469,7 @@ func (sg *SubGraph) applyRandom(ctx context.Context) error {

r := sroar.NewBitmap()
r.SetMany(uidList[:numRandom])
sg.uidMatrix[i].Bitmap = codec.ToBytes(r)
sg.uidMatrix[i].Bitmap = r.ToBuffer()
}

sg.DestMap = codec.Merge(sg.uidMatrix)
Expand All @@ -2490,7 +2490,7 @@ func (sg *SubGraph) applyPagination(ctx context.Context) error {
start, end := x.PageRange(sg.Params.Count, sg.Params.Offset, len(uids))
r := sroar.NewBitmap()
r.SetMany(uids[start:end])
sg.uidMatrix[i].Bitmap = codec.ToBytes(r)
sg.uidMatrix[i].Bitmap = r.ToBuffer()
}
// Re-merge the UID matrix.
sg.DestMap = codec.Merge(sg.uidMatrix)
Expand Down
Loading

0 comments on commit 97d5841

Please sign in to comment.