From 50d8310109c8a745d6d1349d640c047e69e64c71 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 22 May 2020 14:45:33 -0700 Subject: [PATCH] Use pre-allocated protobufs during backups. (#5404) Backups convert the values stored in disk into an intermediate format (for backwards compatibility reasons). This creates a lot of pb.PostingList and pb.BackupPostingList objects that are discarded almost immediatly. Using preallocated protobufs (one per thread used by the stream object) should avoid these allocations altogether. Fixes DGRAPH-1312 --- go.mod | 2 +- go.sum | 18 +++++++-------- posting/list.go | 8 +++---- worker/backup_ee.go | 4 ++-- worker/backup_processor.go | 46 +++++++++++++++++++++++++++++++++----- 5 files changed, 55 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index 407ada6e1f2..08d02af29a8 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd - github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592 + github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27 github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 github.com/dgraph-io/ristretto v0.0.2 github.com/dgrijalva/jwt-go v3.2.0+incompatible diff --git a/go.sum b/go.sum index e312e0a618a..111eef29ddf 100644 --- a/go.sum +++ b/go.sum @@ -78,15 +78,16 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515084818-8ee988574454 h1:xxDt2YtzZ2tVCWVk4tcYyKtZVtWRcScAcsbNhHcE/Qg= -github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515084818-8ee988574454/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= -github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515182826-fafb627b4ce4 h1:0hhuliB6NlxxmqQdrl0MkZi+qjibu69uPKtckRn8E0I= -github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515182826-fafb627b4ce4/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= -github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592 h1:j6j3yunDbktI4H3tbj3grt2enO4EPbhstU6Tb8HwqdQ= -github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200512223356-b7913a671128 h1:S0f6h0t9A3Y+5e7VGMKS/1KVFkAYeNFu3eSpIKHwoQY= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200512223356-b7913a671128/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200518184454-adeff4ecf354 h1:yh6VBtOVenGM/trAufglMpXcTolsFkDa7RZnv5pVqrU= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200518184454-adeff4ecf354/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200518195549-7d06ee67fa2a h1:NixEXAVZF7ik777mj6EKrCCjTxgQ0jWQEHThs09NwFw= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200518195549-7d06ee67fa2a/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27 h1:2oSyH418QYuj2IhW5XWvSJqu5S2wT6czY2ZmPcLhgB4= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 h1:DTgOrw91nMIukDm/WEvdobPLl0LgeDd/JE66+24jBks= github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453/go.mod h1:Co+FwJrnndSrPORO8Gdn20dR7FPTfmXr0W/su0Ve/Ig= -github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU= github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po= github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= @@ -335,7 +336,6 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -405,7 +405,6 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -510,7 +509,6 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f h1:gWF768j/LaZugp8dyS4UwsslYCYz9XgFxvlgsn0n9H8= diff --git a/posting/list.go b/posting/list.go index 038c064eb4b..bb5118d260e 100644 --- a/posting/list.go +++ b/posting/list.go @@ -1490,17 +1490,15 @@ func (l *List) PartSplits() []uint64 { } // ToBackupPostingList converts a posting list into its representation used for storing backups. -func ToBackupPostingList(l *pb.PostingList) *pb.BackupPostingList { - bl := pb.BackupPostingList{} - if l == nil { - return &bl +func ToBackupPostingList(l *pb.PostingList, bl *pb.BackupPostingList) { + if l == nil || bl == nil { + return } bl.Uids = codec.Decode(l.Pack, 0) bl.Postings = l.Postings bl.CommitTs = l.CommitTs bl.Splits = l.Splits - return &bl } // FromBackupPostingList converts a posting list in the format used for backups to a diff --git a/worker/backup_ee.go b/worker/backup_ee.go index 7a2c467e612..b39e7979ae9 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -50,7 +50,7 @@ func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.Status, return nil, err } - bp := &BackupProcessor{DB: pstore, Request: req} + bp := NewBackupProcessor(pstore, req) return bp.WriteBackup(ctx) } @@ -185,7 +185,7 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull } m.Encrypted = (x.WorkerConfig.EncryptionKey != nil) - bp := &BackupProcessor{Request: req} + bp := NewBackupProcessor(nil, req) return bp.CompleteBackup(ctx, &m) } diff --git a/worker/backup_processor.go b/worker/backup_processor.go index f29c08d2cc4..be2fd61045e 100644 --- a/worker/backup_processor.go +++ b/worker/backup_processor.go @@ -33,12 +33,40 @@ import ( "github.com/dgraph-io/dgraph/x" ) +const ( + // backupNumGo is the number of go routines used by the backup stream writer. + backupNumGo = 16 +) + // BackupProcessor handles the different stages of the backup process. type BackupProcessor struct { // DB is the Badger pstore managed by this node. DB *badger.DB // Request stores the backup request containing the parameters for this backup. Request *pb.BackupRequest + + // plList is an array of pre-allocated pb.PostingList objects. + plList []*pb.PostingList + // bplList is an array of pre-allocated pb.BackupPostingList objects. + bplList []*pb.BackupPostingList +} + +func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor { + bp := &BackupProcessor{ + DB: db, + Request: req, + plList: make([]*pb.PostingList, backupNumGo), + bplList: make([]*pb.BackupPostingList, backupNumGo), + } + + for i := range bp.plList { + bp.plList[i] = &pb.PostingList{} + } + for i := range bp.bplList { + bp.bplList[i] = &pb.BackupPostingList{} + } + + return bp } // LoadResult holds the output of a Load operation. @@ -94,6 +122,7 @@ func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.Status, error) stream := pr.DB.NewStreamAt(pr.Request.ReadTs) stream.LogPrefix = "Dgraph.Backup" + stream.NumGo = backupNumGo stream.KeyToList = pr.toBackupList stream.ChooseKey = func(item *badger.Item) bool { parsedKey, err := x.Parse(item.Key()) @@ -187,7 +216,8 @@ func (m *Manifest) GoString() string { m.Since, m.Groups, m.Encrypted) } -func (pr *BackupProcessor) toBackupList(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { +func (pr *BackupProcessor) toBackupList(key []byte, itr *badger.Iterator) ( + *bpb.KVList, error) { list := &bpb.KVList{} item := itr.Item() @@ -218,7 +248,7 @@ func (pr *BackupProcessor) toBackupList(key []byte, itr *badger.Iterator) (*bpb. } kv.Key = backupKey - backupPl, err := toBackupPostingList(kv.Value) + backupPl, err := pr.toBackupPostingList(kv.Value, itr.ThreadId) if err != nil { return nil, err } @@ -262,12 +292,18 @@ func toBackupKey(key []byte) ([]byte, error) { return backupKey, nil } -func toBackupPostingList(val []byte) ([]byte, error) { - pl := &pb.PostingList{} +func (pr *BackupProcessor) toBackupPostingList(val []byte, threadNum int) ([]byte, error) { + pl := pr.plList[threadNum] + bpl := pr.bplList[threadNum] + pl.Reset() + bpl.Reset() + if err := pl.Unmarshal(val); err != nil { return nil, errors.Wrapf(err, "while reading posting list") } - backupVal, err := posting.ToBackupPostingList(pl).Marshal() + posting.ToBackupPostingList(pl, bpl) + backupVal, err := bpl.Marshal() + if err != nil { return nil, errors.Wrapf(err, "while converting posting list for backup") }