Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(rollup): use NSplit API from sroar to improve rollup performance #8092

Merged
merged 39 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
963fb8b
initial work
NamanJain8 Nov 9, 2021
2c70208
clean
NamanJain8 Nov 9, 2021
f8199d9
update sroar
NamanJain8 Nov 9, 2021
a80c289
more changes
NamanJain8 Nov 9, 2021
7612247
remove old code and update sroar
NamanJain8 Nov 10, 2021
0756963
Add cushion
ahsanbarkati Nov 10, 2021
a56d6c4
fix edge case related to startUid = 1
NamanJain8 Nov 11, 2021
77d7306
Use FromSortedList
manishrjain Nov 11, 2021
3c51dc0
Try reducing the batch size
manishrjain Nov 12, 2021
3447c7a
Use handover skiplist
manishrjain Nov 12, 2021
26f081a
more output
manishrjain Nov 12, 2021
8fa79e5
revert last change
manishrjain Nov 12, 2021
7135ede
Print out about big skiplist
manishrjain Nov 12, 2021
b8d77a1
remove print statements and update sroar
NamanJain8 Nov 12, 2021
398dab0
do error checking on handover skip list
NamanJain8 Nov 13, 2021
296d969
Bring latest sroar
ahsanbarkati Nov 15, 2021
4bb7fac
Remove extra logs
ahsanbarkati Nov 15, 2021
ed65a2f
work on jupiter keys
manishrjain Nov 16, 2021
1b6b633
Asserts and sort fix.
ahsanbarkati Nov 16, 2021
c2b5663
assert on uid
ahsanbarkati Nov 16, 2021
0c900f8
Fix up a bug with reuse of posting list
manishrjain Nov 16, 2021
6ab63b6
add assert for sorted bitmap
NamanJain8 Nov 17, 2021
f7a9076
set key in empty pl
NamanJain8 Nov 17, 2021
74f8a94
Add code for jupiter keys
manishrjain Nov 17, 2021
20eb921
update sroar
ahsanbarkati Nov 18, 2021
9ed9271
Add test for jupiter keys
ahsanbarkati Nov 18, 2021
f147be0
Some fixes for max-split
ahsanbarkati Nov 18, 2021
3cefe7c
fix(tests): reset MaxSplits in tests
NamanJain8 Nov 19, 2021
377384b
Cleanup
ahsanbarkati Nov 19, 2021
9fe7bc9
delete split kvs on rollup
NamanJain8 Nov 22, 2021
e0442ff
Merge branch 'naman/perf-split' of github.com:dgraph-io/dgraph into n…
NamanJain8 Nov 22, 2021
75128e2
add BitForbit check at all required places
NamanJain8 Nov 22, 2021
9f8a342
minor fixes: help message, clean comments
NamanJain8 Nov 23, 2021
a192fdc
remove unneeded lines and clear logic in restore
NamanJain8 Nov 24, 2021
972e178
update sroar
NamanJain8 Nov 24, 2021
d12a129
remove comment
NamanJain8 Nov 24, 2021
703e778
fix test
NamanJain8 Nov 24, 2021
d468ce3
update sroar
NamanJain8 Nov 24, 2021
bdf049e
add more checks in test
NamanJain8 Nov 25, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ they form a Raft group and provide synchronous replication.
"It expects the access JWT to be constructed outside dgraph for those users as even "+
"login is denied to them. Additionally, this disables access to environment variables"+
"for minio, aws, etc.").
Flag("max-splits", "How many splits can a single key have, before it is forbidden. "+
"Also known as Jupiter key.").
String())

flag.String("graphql", worker.GraphQLDefaults, z.NewSuperFlagHelp(worker.GraphQLDefaults).
Expand Down
13 changes: 6 additions & 7 deletions dgraph/cmd/bulk/count_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,12 @@ type current struct {

type countIndexer struct {
*reducer
writer *badger.StreamWriter
splitWriter *badger.WriteBatch
splitCh chan *badger.KVList
tmpDb *badger.DB
cur current
countBuf *z.Buffer
wg sync.WaitGroup
writer *badger.StreamWriter
splitCh chan *badger.KVList
tmpDb *badger.DB
cur current
countBuf *z.Buffer
wg sync.WaitGroup
}

// addUid adds the uid from rawKey to a count index if a count index is
Expand Down
82 changes: 42 additions & 40 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ import (
"github.com/dgraph-io/badger/v3"
bo "github.com/dgraph-io/badger/v3/options"
bpb "github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/dgraph-io/sroar"
"github.com/dustin/go-humanize"
"github.com/golang/glog"
"github.com/golang/snappy"
)

Expand Down Expand Up @@ -85,16 +87,13 @@ func (r *reducer) run() error {

writer := db.NewStreamWriter()
x.Check(writer.Prepare())
// Split lists are written to a separate DB first to avoid ordering issues.
splitWriter := tmpDb.NewManagedWriteBatch()

ci := &countIndexer{
reducer: r,
writer: writer,
splitWriter: splitWriter,
tmpDb: tmpDb,
splitCh: make(chan *bpb.KVList, 2*runtime.NumCPU()),
countBuf: getBuf(r.opt.TmpDir),
reducer: r,
writer: writer,
tmpDb: tmpDb,
splitCh: make(chan *bpb.KVList, 2*runtime.NumCPU()),
countBuf: getBuf(r.opt.TmpDir),
}

partitionKeys := make([][]byte, 0, len(partitions))
Expand Down Expand Up @@ -278,37 +277,39 @@ func (r *reducer) encode(entryCh chan *encodeRequest, closer *z.Closer) {
}
}

const maxSplitBatchLen = 1000

func (r *reducer) writeTmpSplits(ci *countIndexer, wg *sync.WaitGroup) {
defer wg.Done()
splitBatchLen := 0

iwg := &sync.WaitGroup{}
for kvs := range ci.splitCh {
if kvs == nil || len(kvs.Kv) == 0 {
continue
}

for i := 0; i < len(kvs.Kv); i += maxSplitBatchLen {
// flush the write batch when the max batch length is reached to prevent the
// value log from growing over the allowed limit.
if splitBatchLen >= maxSplitBatchLen {
x.Check(ci.splitWriter.Flush())
ci.splitWriter = ci.tmpDb.NewManagedWriteBatch()
splitBatchLen = 0
}

batch := &bpb.KVList{}
if i+maxSplitBatchLen >= len(kvs.Kv) {
batch.Kv = kvs.Kv[i:]
} else {
batch.Kv = kvs.Kv[i : i+maxSplitBatchLen]
b := skl.NewBuilder(int64(kvs.Size()) + 1<<20)
for _, kv := range kvs.Kv {
if err := badger.ValidEntry(ci.tmpDb, kv.Key, kv.Value); err != nil {
glog.Errorf("Invalid Entry. len(key): %d len(val): %d\n",
len(kv.Key), len(kv.Value))
continue
}
splitBatchLen += len(batch.Kv)
x.Check(ci.splitWriter.WriteList(batch))
b.Add(y.KeyWithTs(kv.Key, kv.Version),
y.ValueStruct{
Value: kv.Value,
UserMeta: kv.UserMeta[0],
})
}
iwg.Add(1)
err := x.RetryUntilSuccess(1000, 5*time.Second, func() error {
err := ci.tmpDb.HandoverSkiplist(b.Skiplist(), iwg.Done)
if err != nil {
glog.Errorf("writeTmpSplits: handover skiplist returned error: %v. Retrying...\n",
err)
}
return err
})
x.Check(err)
}
x.Check(ci.splitWriter.Flush())
iwg.Wait()
}

func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *z.Closer) {
Expand Down Expand Up @@ -567,10 +568,15 @@ func (r *reducer) toList(req *encodeRequest) {
}

start, end, num := cbuf.StartOffset(), cbuf.StartOffset(), 0

appendToList := func() {
if num == 0 {
return
}
for _, p := range pl.Postings {
freePosting(p)
}
pl.Reset()
atomic.AddInt64(&r.prog.reduceEdgeCount, int64(num))

pk, err := x.Parse(currentKey)
Expand All @@ -592,7 +598,7 @@ func (r *reducer) toList(req *encodeRequest) {
}
}

bm := sroar.NewBitmap()
var uids []uint64
var lastUid uint64
slice, next := []byte{}, start
for next >= 0 && (next < end || end == -1) {
Expand All @@ -605,16 +611,19 @@ func (r *reducer) toList(req *encodeRequest) {
}
lastUid = uid

bm.Set(uid)
// Don't do set here, because this would be slower for Roaring
// Bitmaps to build with. This might cause memory issues though.
// bm.Set(uid)
uids = append(uids, uid)

if pbuf := me.Plist(); len(pbuf) > 0 {
p := getPosting()
x.Check(p.Unmarshal(pbuf))
pl.Postings = append(pl.Postings, p)
}
}

// We should not do defer FreePack here, because we might be giving ownership of it away if
// we run Rollup.
bm := sroar.FromSortedList(uids)
pl.Bitmap = bm.ToBuffer()
numUids := bm.GetCardinality()

Expand All @@ -626,7 +635,6 @@ func (r *reducer) toList(req *encodeRequest) {
// the full pb.Posting type is used (which pb.y contains the
// delta packed UID list).
if numUids == 0 {
// No need to FrePack here because we are reusing alloc.
return
}

Expand All @@ -648,7 +656,6 @@ func (r *reducer) toList(req *encodeRequest) {
}

if posting.ShouldSplit(pl) {
// Give ownership of pl.Pack away to list. Rollup would deallocate the Pack.
l := posting.NewList(y.Copy(currentKey), pl, writeVersionTs)
kvs, err := l.Rollup(nil)
x.Check(err)
Expand All @@ -669,11 +676,6 @@ func (r *reducer) toList(req *encodeRequest) {
kv.StreamId = r.streamIdFor(pk.Attr)
badger.KVToBuffer(kv, kvBuf)
}

for _, p := range pl.Postings {
freePosting(p)
}
pl.Reset()
}

for end >= 0 {
Expand Down
7 changes: 7 additions & 0 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/dgraph/ee"
"github.com/dgraph-io/dgraph/filestore"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/ristretto/z"
Expand Down Expand Up @@ -121,6 +122,8 @@ func init() {
"Namespace onto which to load the data. If not set, will preserve the namespace."+
" When using this flag to load data into specific namespace, make sure that the "+
"load data do not have ACL data.")
flag.Int64("max-splits", 1000,
"How many splits can a single key have, before it is forbidden. Also known as Jupiter key.")

flag.String("badger", BulkBadgerDefaults, z.NewSuperFlagHelp(BulkBadgerDefaults).
Head("Badger options (Refer to badger documentation for all possible options)").
Expand Down Expand Up @@ -178,6 +181,10 @@ func run() {
Badger: bopts,
}

// set MaxSplits because while bulk-loading alpha won't be running and rollup would not be
// able to pick value for max-splits from x.Config.Limit.
posting.MaxSplits = Bulk.Conf.GetInt("max-splits")

x.PrintVersion()
if opt.Version {
os.Exit(0)
Expand Down
3 changes: 2 additions & 1 deletion dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,8 @@ func printKeys(db *badger.DB) {
}
switch item.UserMeta() {
// This is rather a default case as one of the 4 bit must be set.
case posting.BitCompletePosting, posting.BitEmptyPosting, posting.BitSchemaPosting:
case posting.BitCompletePosting, posting.BitEmptyPosting, posting.BitSchemaPosting,
posting.BitForbidPosting:
sz += item.EstimatedSize()
break LOOP
case posting.BitDeltaPosting:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
github.com/dgraph-io/ristretto v0.1.1-0.20210824115121-89e99415887a
github.com/dgraph-io/simdjson-go v0.3.0
github.com/dgraph-io/sroar v0.0.0-20210930201544-8a9a0351f20f
github.com/dgraph-io/sroar v0.0.0-20211124172931-39228b21f455
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ github.com/dgraph-io/ristretto v0.1.1-0.20210824115121-89e99415887a h1:2+hTlwc5y
github.com/dgraph-io/ristretto v0.1.1-0.20210824115121-89e99415887a/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY=
github.com/dgraph-io/sroar v0.0.0-20210930201544-8a9a0351f20f h1:/7NA7mug98b6QkBU+3BEfun5EpegFVx7cRZU86vTouQ=
github.com/dgraph-io/sroar v0.0.0-20210930201544-8a9a0351f20f/go.mod h1:bdNPtQmcxoIQVkZEWZvX0n0/IDlHFab397xdBlP4OoE=
github.com/dgraph-io/sroar v0.0.0-20211124172931-39228b21f455 h1:BQ7LGEKBpSU83FW1qQJS5aN2vYQ2v8nMElVrovG3lzk=
github.com/dgraph-io/sroar v0.0.0-20211124172931-39228b21f455/go.mod h1:bdNPtQmcxoIQVkZEWZvX0n0/IDlHFab397xdBlP4OoE=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
Expand Down
2 changes: 1 addition & 1 deletion posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,

var wg sync.WaitGroup
wg.Add(1)
pstore.HandoverSkiplist(sl, wg.Done)
require.NoError(t, pstore.HandoverSkiplist(sl, wg.Done))
wg.Wait()
}

Expand Down
Loading