Skip to content

opt(recurse): Optimise recurse and bring range iterators from sroar #7989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion algo/uidlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func ApplyFilter(u *pb.List, f func(uint64, int) bool) {
} else {
b := sroar.NewBitmap()
b.SetMany(out)
u.Bitmap = codec.ToBytes(b)
u.Bitmap = b.ToBuffer()
}
}

Expand Down
54 changes: 33 additions & 21 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func ApproxLen(bitmap []byte) int {

func ToList(rm *sroar.Bitmap) *pb.List {
return &pb.List{
Bitmap: ToBytes(rm),
Bitmap: rm.ToBufferWithCopy(),
}
}

Expand All @@ -62,7 +62,7 @@ func ListCardinality(l *pb.List) uint64 {
if len(l.SortedUids) > 0 {
return uint64(len(l.SortedUids))
}
b := FromList(l)
b := FromListNoCopy(l)
return uint64(b.GetCardinality())
}

Expand All @@ -88,7 +88,7 @@ func SetUids(l *pb.List, uids []uint64) {
} else {
r := sroar.NewBitmap()
r.SetMany(uids)
l.Bitmap = ToBytes(r)
l.Bitmap = r.ToBuffer()
}
}

Expand Down Expand Up @@ -132,35 +132,47 @@ func Merge(matrix []*pb.List) *sroar.Bitmap {
if len(matrix) == 0 {
return out
}
out.Or(FromList(matrix[0]))
for _, l := range matrix[1:] {
r := FromList(l)
out.Or(r)
}
return out
}

func ToBytes(bm *sroar.Bitmap) []byte {
if bm.IsEmpty() {
return nil
var bms []*sroar.Bitmap
for _, m := range matrix {
bmc := FromListNoCopy(m)
if bmc != nil {
bms = append(bms, bmc)
}
}
// TODO: We should not use ToBufferWithCopy always.
return bm.ToBufferWithCopy()
return sroar.FastOr(bms...)
}

func FromList(l *pb.List) *sroar.Bitmap {
iw := sroar.NewBitmap()
if l == nil {
return iw
return nil
}
// Keep the check for bitmap before sortedUids because we expect to have bitmap very often
if len(l.Bitmap) > 0 {
return sroar.FromBufferWithCopy(l.Bitmap)
}
if len(l.SortedUids) > 0 {
iw.SetMany(l.SortedUids)
bm := sroar.NewBitmap()
bm.SetMany(l.SortedUids)
return bm
}
return sroar.NewBitmap()
}

func FromListNoCopy(l *pb.List) *sroar.Bitmap {
if l == nil {
return nil
}
// Keep the check for bitmap before sortedUids because we expect to have bitmap very often
if len(l.Bitmap) > 0 {
// TODO: We should not use FromBufferWithCopy always.
iw = sroar.FromBufferWithCopy(l.Bitmap)
return sroar.FromBuffer(l.Bitmap)
}
if len(l.SortedUids) > 0 {
bm := sroar.NewBitmap()
bm.SetMany(l.SortedUids)
return bm
}
return iw
return sroar.NewBitmap()
}

func FromBytes(buf []byte) *sroar.Bitmap {
Expand Down
3 changes: 1 addition & 2 deletions dgraph/cmd/bulk/count_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"sync/atomic"

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/dgraph/codec"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -146,7 +145,7 @@ func (c *countIndexer) writeIndex(buf *z.Buffer) {
return
}

pl.Bitmap = codec.ToBytes(bm)
pl.Bitmap = bm.ToBuffer()

kv := posting.MarshalPostingList(&pl, nil)
kv.Key = append([]byte{}, lastCe.Key()...)
Expand Down
3 changes: 1 addition & 2 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
bo "github.com/dgraph-io/badger/v3/options"
bpb "github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/badger/v3/y"
"github.com/dgraph-io/dgraph/codec"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -615,7 +614,7 @@ func (r *reducer) toList(req *encodeRequest) {

// We should not do defer FreePack here, because we might be giving ownership of it away if
// we run Rollup.
pl.Bitmap = codec.ToBytes(bm)
pl.Bitmap = bm.ToBuffer()
numUids := bm.GetCardinality()

atomic.AddInt64(&r.prog.reduceKeyCount, 1)
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
github.com/dgraph-io/ristretto v0.1.0
github.com/dgraph-io/simdjson-go v0.3.0
github.com/dgraph-io/sroar v0.0.0-20210812083743-986f6c1098e2
github.com/dgraph-io/sroar v0.0.0-20210816191646-201f86ca72d6
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
Expand Down Expand Up @@ -75,13 +75,14 @@ require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744
golang.org/x/text v0.3.6
golang.org/x/tools v0.1.1
golang.org/x/tools v0.1.6-0.20210802203754-9b21a8868e16
google.golang.org/api v0.46.0
google.golang.org/genproto v0.0.0-20210510173355-fb37daa5cd7a // indirect
google.golang.org/grpc v1.37.1
google.golang.org/grpc/examples v0.0.0-20210518002758-2713b77e8526 // indirect
gopkg.in/DataDog/dd-trace-go.v1 v1.13.1 // indirect
gopkg.in/square/go-jose.v2 v2.3.1
gopkg.in/yaml.v2 v2.2.8
honnef.co/go/tools v0.2.0 // indirect
src.techknowlogick.com/xgo v1.4.1-0.20210311222705-d25c33fcd864
)
11 changes: 6 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/Lu
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY=
github.com/dgraph-io/sroar v0.0.0-20210812083743-986f6c1098e2 h1:r6+OcMwALExXQjyoZPBBKjGHTTioBU39J1aagrxgzN4=
github.com/dgraph-io/sroar v0.0.0-20210812083743-986f6c1098e2/go.mod h1:bdNPtQmcxoIQVkZEWZvX0n0/IDlHFab397xdBlP4OoE=
github.com/dgraph-io/sroar v0.0.0-20210816191646-201f86ca72d6 h1:n7NOldILQ6Dy6Z2uaNWb1ZzSO4dj0DxDvyZ2DwXzKdI=
github.com/dgraph-io/sroar v0.0.0-20210816191646-201f86ca72d6/go.mod h1:bdNPtQmcxoIQVkZEWZvX0n0/IDlHFab397xdBlP4OoE=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
Expand Down Expand Up @@ -1007,8 +1007,8 @@ golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1 h1:wGiQel/hW0NnEkJUk8lbzkX2gFJU6PFxf1v5OlCfuOs=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.6-0.20210802203754-9b21a8868e16 h1:ZC/gVBZl8poJyKzWLxxlsmhayVGosF4mohR35szD5Bg=
golang.org/x/tools v0.1.6-0.20210802203754-9b21a8868e16/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -1185,8 +1185,9 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.2.0 h1:ws8AfbgTX3oIczLPNPCu5166oBg9ST2vNs0rcht+mDE=
honnef.co/go/tools v0.2.0/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
18 changes: 9 additions & 9 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,8 @@ func (l *List) iterateAll(readTs uint64, afterUid uint64, f func(obj *pb.Posting

advance := func() {
next = math.MaxUint64
if uitr.HasNext() {
next = uitr.Next()
if nx := uitr.Next(); nx > 0 {
next = nx
}
}
advance()
Expand Down Expand Up @@ -639,8 +639,8 @@ func (l *List) iterateAll(readTs uint64, afterUid uint64, f func(obj *pb.Posting

codec.RemoveRange(bm, 0, maxUid)
uitr = bm.NewIterator()
for uitr.HasNext() {
p.Uid = uitr.Next()
for u := uitr.Next(); u > 0; u = uitr.Next() {
p.Uid = u
f(p)
}
return nil
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func (ro *rollupOutput) split(startUid uint64) error {
// Remove everything from startUid to uid.
nr := r.Clone()
nr.RemoveRange(0, uid) // Keep all uids >= uid.
newpl.Bitmap = codec.ToBytes(nr)
newpl.Bitmap = nr.ToBuffer()

// Take everything from the first posting where posting.Uid >= uid.
idx := sort.Search(len(pl.Postings), func(i int) bool {
Expand All @@ -1117,7 +1117,7 @@ func (ro *rollupOutput) split(startUid uint64) error {

// Update pl as well. Keeps the lower UIDs.
codec.RemoveRange(r, uid, math.MaxUint64)
pl.Bitmap = codec.ToBytes(r)
pl.Bitmap = r.ToBuffer()
pl.Postings = pl.Postings[:idx]

return nil
Expand Down Expand Up @@ -1169,7 +1169,7 @@ func (l *List) encode(out *rollupOutput, readTs uint64, split bool) error {
}

plist := &pb.PostingList{}
plist.Bitmap = codec.ToBytes(r)
plist.Bitmap = r.ToBuffer()

out.parts[startUid] = plist
}
Expand Down Expand Up @@ -1283,7 +1283,7 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) {
// Before this, we were only picking math.Int32 number of uids.
// Now we're picking everything.
if opt.First == 0 {
out.Bitmap = codec.ToBytes(bm)
out.Bitmap = bm.ToBufferWithCopy()
// TODO: Not yet ready to use Bitmap for data transfer. We'd have to deal with all the
// places where List.Uids is being called.
// out.Bitmap = codec.ToBytes(bm)
Expand Down Expand Up @@ -1688,7 +1688,7 @@ func FromBackupPostingList(bl *pb.BackupPostingList) *pb.PostingList {
} else if len(bl.UidBytes) > 0 {
r = codec.FromBackup(bl.UidBytes)
}
l.Bitmap = codec.ToBytes(r)
l.Bitmap = r.ToBuffer()
l.Postings = bl.Postings
l.CommitTs = bl.CommitTs
l.Splits = bl.Splits
Expand Down
6 changes: 3 additions & 3 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,9 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}

func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
if pstore.IsClosed() {
return nil, badger.ErrDBClosed
}
// TODO: Fix this up later.
// cachedVal, ok := lCache.Get(key)
// if ok {
Expand All @@ -537,9 +540,6 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
// }
// }

if pstore.IsClosed() {
return nil, badger.ErrDBClosed
}
txn := pstore.NewTransactionAt(readTs, false)
defer txn.Discard()

Expand Down
2 changes: 1 addition & 1 deletion query/groupby.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (d *dedup) addValue(attr string, value types.Val, uid uint64) {
}
r := codec.FromList(cur.elements[strKey].entities)
r.Set(uid)
cur.elements[strKey].entities.Bitmap = codec.ToBytes(r)
cur.elements[strKey].entities.Bitmap = r.ToBuffer()
}

func aggregateGroup(grp *groupResult, child *SubGraph) (types.Val, error) {
Expand Down
16 changes: 8 additions & 8 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func (sg *SubGraph) populate(uids []uint64) error {
sort.Slice(uids, func(i, j int) bool { return uids[i] < uids[j] })
r := sroar.NewBitmap()
r.SetMany(uids)
sg.uidMatrix = []*pb.List{{Bitmap: codec.ToBytes(r)}}
sg.uidMatrix = []*pb.List{{Bitmap: r.ToBuffer()}}
// User specified list may not be sorted.
sg.SrcUIDs = &pb.List{SortedUids: uids}
return nil
Expand Down Expand Up @@ -1275,7 +1275,7 @@ func (sg *SubGraph) valueVarAggregation(doneVars map[string]varValue, path []*Su
mp := make(map[uint64]types.Val)
rangeOver := sg.SrcUIDs
if parent == nil {
rangeOver = &pb.List{Bitmap: codec.ToBytes(sg.DestMap)}
rangeOver = &pb.List{Bitmap: sg.DestMap.ToBuffer()}
}
if rangeOver == nil {
it := doneVars[sg.Params.Var]
Expand Down Expand Up @@ -1433,8 +1433,8 @@ func (sg *SubGraph) populateVarMap(doneVars map[string]varValue, sgPath []*SubGr
// Filter out UIDs that don't have atleast one UID in every child.
itr := sg.DestMap.NewIterator()
out := sg.DestMap.Clone()
for i := 0; itr.HasNext(); i++ {
uid := itr.Next()
uid := itr.Next()
for i := 0; uid > 0; i++ {
var exclude bool
for _, child := range sg.Children {
// For uid we dont actually populate the uidMatrix or values. So a node asking for
Expand All @@ -1458,6 +1458,7 @@ func (sg *SubGraph) populateVarMap(doneVars map[string]varValue, sgPath []*SubGr
if exclude {
out.Remove(uid)
}
uid = itr.Next()
}
// Note the we can't overwrite DestUids, as it'd also modify the SrcUids of
// next level and the mapping from SrcUids to uidMatrix would be lost.
Expand Down Expand Up @@ -1775,8 +1776,7 @@ func (sg *SubGraph) fillVars(mp map[string]varValue) error {
case (v.Typ == gql.UidVar && sg.SrcFunc != nil && sg.SrcFunc.Name == "uid_in"):
srcFuncArgs := sg.SrcFunc.Args[:0]
itr := l.UidMap.NewIterator()
for itr.HasNext() {
uid := itr.Next()
for uid := itr.Next(); uid > 0; uid = itr.Next() {
// We use base 10 here because the uid parser expects the uid to be in base 10.
arg := gql.Arg{Value: strconv.FormatUint(uid, 10)}
srcFuncArgs = append(srcFuncArgs, arg)
Expand Down Expand Up @@ -2469,7 +2469,7 @@ func (sg *SubGraph) applyRandom(ctx context.Context) error {

r := sroar.NewBitmap()
r.SetMany(uidList[:numRandom])
sg.uidMatrix[i].Bitmap = codec.ToBytes(r)
sg.uidMatrix[i].Bitmap = r.ToBuffer()
}

sg.DestMap = codec.Merge(sg.uidMatrix)
Expand All @@ -2490,7 +2490,7 @@ func (sg *SubGraph) applyPagination(ctx context.Context) error {
start, end := x.PageRange(sg.Params.Count, sg.Params.Offset, len(uids))
r := sroar.NewBitmap()
r.SetMany(uids[start:end])
sg.uidMatrix[i].Bitmap = codec.ToBytes(r)
sg.uidMatrix[i].Bitmap = r.ToBuffer()
}
// Re-merge the UID matrix.
sg.DestMap = codec.Merge(sg.uidMatrix)
Expand Down
Loading