From b6e077ac1e846c37889c0cab53e0aa043b61eaba Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 22 May 2020 14:45:33 -0700 Subject: [PATCH] Use pre-allocated protobufs during backups. Backups convert the values stored in disk into an intermediate format (for backwards compatibility reasons). This creates a lot of pb.PostingList and pb.BackupPostingList objects that are discarded almost immediatly. Using preallocated protobufs (one per thread used by the stream object) should avoid these allocations altogether. Fixes DGRAPH-1312 --- ee/backup/backup.go | 43 +++++++++++++++++++++++++++++++++++++++---- go.mod | 2 +- go.sum | 4 ++-- posting/list.go | 8 +++----- worker/backup_ee.go | 2 +- 5 files changed, 46 insertions(+), 13 deletions(-) diff --git a/ee/backup/backup.go b/ee/backup/backup.go index c6a0d044b9f..19d6c42e716 100644 --- a/ee/backup/backup.go +++ b/ee/backup/backup.go @@ -34,12 +34,40 @@ import ( "github.com/dgraph-io/dgraph/x" ) +const ( + // backupNumGo is the number of go routines used by the backup stream writer. + backupNumGo = 16 +) + // Processor handles the different stages of the backup process. type Processor struct { // DB is the Badger pstore managed by this node. DB *badger.DB // Request stores the backup request containing the parameters for this backup. Request *pb.BackupRequest + + // plList is an array of pre-allocated pb.PostingList objects. + plList []*pb.PostingList + // bplList is an array of pre-allocated pb.BackupPostingList objects. + bplList []*pb.BackupPostingList +} + +func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *Processor { + bp := &Processor{ + DB: db, + Request: req, + plList: make([]*pb.PostingList, backupNumGo), + bplList: make([]*pb.BackupPostingList, backupNumGo), + } + + for i := range bp.plList { + bp.plList[i] = &pb.PostingList{} + } + for i := range bp.bplList { + bp.bplList[i] = &pb.BackupPostingList{} + } + + return bp } // LoadResult holds the output of a Load operation. @@ -141,6 +169,7 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) { stream := pr.DB.NewStreamAt(pr.Request.ReadTs) stream.LogPrefix = "Dgraph.Backup" + stream.NumGo = backupNumGo stream.KeyToList = pr.toBackupList stream.ChooseKey = func(item *badger.Item) bool { parsedKey, err := x.Parse(item.Key()) @@ -259,7 +288,7 @@ func (pr *Processor) toBackupList(key []byte, itr *badger.Iterator) (*bpb.KVList } kv.Key = backupKey - backupPl, err := toBackupPostingList(kv.Value) + backupPl, err := pr.toBackupPostingList(kv.Value, itr.ThreadId) if err != nil { return nil, err } @@ -304,12 +333,18 @@ func toBackupKey(key []byte) ([]byte, error) { return backupKey, nil } -func toBackupPostingList(val []byte) ([]byte, error) { - pl := &pb.PostingList{} +func (pr *Processor) toBackupPostingList(val []byte, threadNum int) ([]byte, error) { + pl := pr.plList[threadNum] + bpl := pr.bplList[threadNum] + pl.Reset() + bpl.Reset() + if err := pl.Unmarshal(val); err != nil { return nil, errors.Wrapf(err, "while reading posting list") } - backupVal, err := posting.ToBackupPostingList(pl).Marshal() + posting.ToBackupPostingList(pl, bpl) + backupVal, err := bpl.Marshal() + if err != nil { return nil, errors.Wrapf(err, "while converting posting list for backup") } diff --git a/go.mod b/go.mod index cdb0aff7a00..d7817c90967 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd - github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592 + github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27 github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6 github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 github.com/dgrijalva/jwt-go v3.2.0+incompatible diff --git a/go.sum b/go.sum index 63d1cb1faa1..ade58743500 100644 --- a/go.sum +++ b/go.sum @@ -62,8 +62,8 @@ github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkE github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592 h1:j6j3yunDbktI4H3tbj3grt2enO4EPbhstU6Tb8HwqdQ= -github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27 h1:2oSyH418QYuj2IhW5XWvSJqu5S2wT6czY2ZmPcLhgB4= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6 h1:5leDFqGys055YO3TbghBhk/QdRPEwyLPdgsSJfiR20I= github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI= github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU= diff --git a/posting/list.go b/posting/list.go index 6cd02c3d230..96b9e9973cd 100644 --- a/posting/list.go +++ b/posting/list.go @@ -1486,17 +1486,15 @@ func (l *List) PartSplits() []uint64 { } // ToBackupPostingList converts a posting list into its representation used for storing backups. -func ToBackupPostingList(l *pb.PostingList) *pb.BackupPostingList { - bl := pb.BackupPostingList{} - if l == nil { - return &bl +func ToBackupPostingList(l *pb.PostingList, bl *pb.BackupPostingList) { + if l == nil || bl == nil { + return } bl.Uids = codec.Decode(l.Pack, 0) bl.Postings = l.Postings bl.CommitTs = l.CommitTs bl.Splits = l.Splits - return &bl } // FromBackupPostingList converts a posting list in the format used for backups to a diff --git a/worker/backup_ee.go b/worker/backup_ee.go index de0442c9306..48e0ba6be3b 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -46,7 +46,7 @@ func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.Status, return nil, err } - bp := &backup.Processor{DB: pstore, Request: req} + bp := backup.NewBackupProcessor(pstore, req) return bp.WriteBackup(ctx) }