Skip to content

opt(recurse): Optimise recurse and bring range iterators from sroar #7989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ func ListCardinality(l *pb.List) uint64 {
if len(l.SortedUids) > 0 {
return uint64(len(l.SortedUids))
}
b := FromList(l)
b := FromListNoCopy(l)
if b == nil {
return 0
}
// TODO: Update GetCardinality to handle nil Bitmap.
return uint64(b.GetCardinality())
}

Expand Down Expand Up @@ -132,12 +136,15 @@ func Merge(matrix []*pb.List) *sroar.Bitmap {
if len(matrix) == 0 {
return out
}
out.Or(FromList(matrix[0]))
for _, l := range matrix[1:] {
r := FromList(l)
out.Or(r)

var bms []*sroar.Bitmap
for _, m := range matrix {
bmc := FromListNoCopy(m)
if bmc != nil {
bms = append(bms, bmc)
}
}
return out
return sroar.FastOr(bms...)
}

func ToBytes(bm *sroar.Bitmap) []byte {
Expand All @@ -148,6 +155,13 @@ func ToBytes(bm *sroar.Bitmap) []byte {
return bm.ToBufferWithCopy()
}

func ToBytesNoCopy(bm *sroar.Bitmap) []byte {
if bm.IsEmpty() {
return nil
}
return bm.ToBuffer()
}

func FromList(l *pb.List) *sroar.Bitmap {
iw := sroar.NewBitmap()
if l == nil {
Expand All @@ -163,6 +177,23 @@ func FromList(l *pb.List) *sroar.Bitmap {
return iw
}

func FromListNoCopy(l *pb.List) *sroar.Bitmap {
if l == nil {
return nil
}
if len(l.SortedUids) > 0 {
bm := sroar.NewBitmap()
bm.SetMany(l.SortedUids)
return bm
}
if len(l.Bitmap) > 0 {
// Optimize the code for this case. Avoid creating NewBitmap for error
// handling.
return sroar.FromBuffer(l.Bitmap)
}
return nil
}

func FromBytes(buf []byte) *sroar.Bitmap {
r := sroar.NewBitmap()
if buf == nil || len(buf) == 0 {
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
github.com/dgraph-io/ristretto v0.1.0
github.com/dgraph-io/simdjson-go v0.3.0
github.com/dgraph-io/sroar v0.0.0-20210812083743-986f6c1098e2
github.com/dgraph-io/sroar v0.0.0-20210816152148-de6cc6680eec
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
Expand Down Expand Up @@ -75,13 +75,14 @@ require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744
golang.org/x/text v0.3.6
golang.org/x/tools v0.1.1
golang.org/x/tools v0.1.6-0.20210802203754-9b21a8868e16
google.golang.org/api v0.46.0
google.golang.org/genproto v0.0.0-20210510173355-fb37daa5cd7a // indirect
google.golang.org/grpc v1.37.1
google.golang.org/grpc/examples v0.0.0-20210518002758-2713b77e8526 // indirect
gopkg.in/DataDog/dd-trace-go.v1 v1.13.1 // indirect
gopkg.in/square/go-jose.v2 v2.3.1
gopkg.in/yaml.v2 v2.2.8
honnef.co/go/tools v0.2.0 // indirect
src.techknowlogick.com/xgo v1.4.1-0.20210311222705-d25c33fcd864
)
11 changes: 6 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/Lu
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY=
github.com/dgraph-io/sroar v0.0.0-20210812083743-986f6c1098e2 h1:r6+OcMwALExXQjyoZPBBKjGHTTioBU39J1aagrxgzN4=
github.com/dgraph-io/sroar v0.0.0-20210812083743-986f6c1098e2/go.mod h1:bdNPtQmcxoIQVkZEWZvX0n0/IDlHFab397xdBlP4OoE=
github.com/dgraph-io/sroar v0.0.0-20210816152148-de6cc6680eec h1:tNXffo6Ede/BZVR/zOkchfLwOx1irhplsP983LXP38I=
github.com/dgraph-io/sroar v0.0.0-20210816152148-de6cc6680eec/go.mod h1:bdNPtQmcxoIQVkZEWZvX0n0/IDlHFab397xdBlP4OoE=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
Expand Down Expand Up @@ -1007,8 +1007,8 @@ golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1 h1:wGiQel/hW0NnEkJUk8lbzkX2gFJU6PFxf1v5OlCfuOs=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.6-0.20210802203754-9b21a8868e16 h1:ZC/gVBZl8poJyKzWLxxlsmhayVGosF4mohR35szD5Bg=
golang.org/x/tools v0.1.6-0.20210802203754-9b21a8868e16/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -1185,8 +1185,9 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.2.0 h1:ws8AfbgTX3oIczLPNPCu5166oBg9ST2vNs0rcht+mDE=
honnef.co/go/tools v0.2.0/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
8 changes: 4 additions & 4 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,8 @@ func (l *List) iterateAll(readTs uint64, afterUid uint64, f func(obj *pb.Posting

advance := func() {
next = math.MaxUint64
if uitr.HasNext() {
next = uitr.Next()
if nx := uitr.Next(); nx > 0 {
next = nx
}
}
advance()
Expand Down Expand Up @@ -639,8 +639,8 @@ func (l *List) iterateAll(readTs uint64, afterUid uint64, f func(obj *pb.Posting

codec.RemoveRange(bm, 0, maxUid)
uitr = bm.NewIterator()
for uitr.HasNext() {
p.Uid = uitr.Next()
for u := uitr.Next(); u > 0; u = uitr.Next() {
p.Uid = u
f(p)
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,9 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}

func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
if pstore.IsClosed() {
return nil, badger.ErrDBClosed
}
// TODO: Fix this up later.
// cachedVal, ok := lCache.Get(key)
// if ok {
Expand All @@ -537,9 +540,6 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
// }
// }

if pstore.IsClosed() {
return nil, badger.ErrDBClosed
}
txn := pstore.NewTransactionAt(readTs, false)
defer txn.Discard()

Expand Down
8 changes: 4 additions & 4 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,8 +1433,8 @@ func (sg *SubGraph) populateVarMap(doneVars map[string]varValue, sgPath []*SubGr
// Filter out UIDs that don't have atleast one UID in every child.
itr := sg.DestMap.NewIterator()
out := sg.DestMap.Clone()
for i := 0; itr.HasNext(); i++ {
uid := itr.Next()
uid := itr.Next()
for i := 0; uid > 0; i++ {
var exclude bool
for _, child := range sg.Children {
// For uid we dont actually populate the uidMatrix or values. So a node asking for
Expand All @@ -1458,6 +1458,7 @@ func (sg *SubGraph) populateVarMap(doneVars map[string]varValue, sgPath []*SubGr
if exclude {
out.Remove(uid)
}
uid = itr.Next()
}
// Note the we can't overwrite DestUids, as it'd also modify the SrcUids of
// next level and the mapping from SrcUids to uidMatrix would be lost.
Expand Down Expand Up @@ -1775,8 +1776,7 @@ func (sg *SubGraph) fillVars(mp map[string]varValue) error {
case (v.Typ == gql.UidVar && sg.SrcFunc != nil && sg.SrcFunc.Name == "uid_in"):
srcFuncArgs := sg.SrcFunc.Args[:0]
itr := l.UidMap.NewIterator()
for itr.HasNext() {
uid := itr.Next()
for uid := itr.Next(); uid > 0; uid = itr.Next() {
// We use base 10 here because the uid parser expects the uid to be in base 10.
arg := gql.Arg{Value: strconv.FormatUint(uid, 10)}
srcFuncArgs = append(srcFuncArgs, arg)
Expand Down
48 changes: 35 additions & 13 deletions query/recurse.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ package query

import (
"context"
"fmt"
"math"
"strconv"

"github.com/dgraph-io/dgraph/algo"
"github.com/dgraph-io/dgraph/codec"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/sroar"
"github.com/pkg/errors"
)

func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error {
// Note: Key format is - "attr|fromUID|toUID"
reachMap := make(map[string]struct{})
reachMap := make(map[string]*sroar.Bitmap)
allowLoop := start.Params.RecurseArgs.AllowLoop
var numEdges uint64
var exec []*SubGraph
Expand Down Expand Up @@ -121,21 +122,42 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error

for mIdx, fromUID := range codec.GetUids(sg.SrcUIDs) {
if allowLoop {
// TODO: This needs to be optimized.
for _, ul := range sg.uidMatrix {
numEdges += codec.ListCardinality(ul)
}
} else {
algo.ApplyFilter(sg.uidMatrix[mIdx], func(uid uint64, i int) bool {
key := fmt.Sprintf("%s|%d|%d", sg.Attr, fromUID, uid)
_, seen := reachMap[key] // Combine fromUID here.
if seen {
return false
}
// Mark this edge as taken. We'd disallow this edge later.
reachMap[key] = struct{}{}
numEdges++
return true
})
ul := sg.uidMatrix[mIdx]
ur := codec.FromListNoCopy(ul)
if ur == nil {
continue
}

key := sg.Attr + "|" + strconv.Itoa(int(fromUID))
prev, ok := reachMap[key]
if !ok {
reachMap[key] = codec.FromList(ul)
continue
}
// Any edges that we have seen before, do not consider
// them again.
if len(sg.uidMatrix[mIdx].SortedUids) > 0 {
// we will have to keep the order, so using ApplyFilter
algo.ApplyFilter(sg.uidMatrix[mIdx], func(uid uint64, i int) bool {
if ur.Contains(uid) {
return false
}
numEdges++
return true
})
} else {
ur.AndNot(prev) // This would only keep the UIDs which are NEW.
sg.uidMatrix[mIdx].Bitmap = ur.ToBuffer()
numEdges += uint64(ur.GetCardinality())

prev.Or(ur) // Add the new UIDs to our "reach"
reachMap[key] = prev
}
}
}
if len(sg.Params.Order) > 0 || len(sg.Params.FacetsOrder) > 0 {
Expand Down
5 changes: 3 additions & 2 deletions worker/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ func multiSort(ctx context.Context, r *sortresult, ts *pb.SortMessage) error {
dsz := int(dest.GetCardinality())
x.AssertTrue(len(result.ValueMatrix) == dsz)
itr := dest.NewIterator()
for i := 0; itr.HasNext(); i++ {
uid := itr.Next()
for i := 0; uid > 0; i++ {
var sv types.Val
if len(result.ValueMatrix[i].Values) == 0 {
// Assign nil value which is sorted as greater than all other values.
Expand All @@ -428,8 +429,8 @@ func multiSort(ctx context.Context, r *sortresult, ts *pb.SortMessage) error {
return err
}
}
uid := itr.Next()
sortVals[uid][or.idx] = sv
uid = itr.Next()
}
}

Expand Down
Loading