From b6e077ac1e846c37889c0cab53e0aa043b61eaba Mon Sep 17 00:00:00 2001
From: Martin Martinez Rivera <martinmr@dgraph.io>
Date: Fri, 22 May 2020 14:45:33 -0700
Subject: [PATCH] Use pre-allocated protobufs during backups.

Backups convert the values stored in disk into an intermediate format
(for backwards compatibility reasons). This creates a lot of
pb.PostingList and pb.BackupPostingList objects that are discarded
almost immediatly. Using preallocated protobufs (one per thread used by
the stream object) should avoid these allocations altogether.

Fixes DGRAPH-1312
---
 ee/backup/backup.go | 43 +++++++++++++++++++++++++++++++++++++++----
 go.mod              |  2 +-
 go.sum              |  4 ++--
 posting/list.go     |  8 +++-----
 worker/backup_ee.go |  2 +-
 5 files changed, 46 insertions(+), 13 deletions(-)

diff --git a/ee/backup/backup.go b/ee/backup/backup.go
index c6a0d044b9f..19d6c42e716 100644
--- a/ee/backup/backup.go
+++ b/ee/backup/backup.go
@@ -34,12 +34,40 @@ import (
 	"github.com/dgraph-io/dgraph/x"
 )
 
+const (
+	// backupNumGo is the number of go routines used by the backup stream writer.
+	backupNumGo = 16
+)
+
 // Processor handles the different stages of the backup process.
 type Processor struct {
 	// DB is the Badger pstore managed by this node.
 	DB *badger.DB
 	// Request stores the backup request containing the parameters for this backup.
 	Request *pb.BackupRequest
+
+	// plList is an array of pre-allocated pb.PostingList objects.
+	plList []*pb.PostingList
+	// bplList is an array of pre-allocated pb.BackupPostingList objects.
+	bplList []*pb.BackupPostingList
+}
+
+func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *Processor {
+	bp := &Processor{
+		DB:      db,
+		Request: req,
+		plList:  make([]*pb.PostingList, backupNumGo),
+		bplList: make([]*pb.BackupPostingList, backupNumGo),
+	}
+
+	for i := range bp.plList {
+		bp.plList[i] = &pb.PostingList{}
+	}
+	for i := range bp.bplList {
+		bp.bplList[i] = &pb.BackupPostingList{}
+	}
+
+	return bp
 }
 
 // LoadResult holds the output of a Load operation.
@@ -141,6 +169,7 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) {
 
 	stream := pr.DB.NewStreamAt(pr.Request.ReadTs)
 	stream.LogPrefix = "Dgraph.Backup"
+	stream.NumGo = backupNumGo
 	stream.KeyToList = pr.toBackupList
 	stream.ChooseKey = func(item *badger.Item) bool {
 		parsedKey, err := x.Parse(item.Key())
@@ -259,7 +288,7 @@ func (pr *Processor) toBackupList(key []byte, itr *badger.Iterator) (*bpb.KVList
 			}
 			kv.Key = backupKey
 
-			backupPl, err := toBackupPostingList(kv.Value)
+			backupPl, err := pr.toBackupPostingList(kv.Value, itr.ThreadId)
 			if err != nil {
 				return nil, err
 			}
@@ -304,12 +333,18 @@ func toBackupKey(key []byte) ([]byte, error) {
 	return backupKey, nil
 }
 
-func toBackupPostingList(val []byte) ([]byte, error) {
-	pl := &pb.PostingList{}
+func (pr *Processor) toBackupPostingList(val []byte, threadNum int) ([]byte, error) {
+	pl := pr.plList[threadNum]
+	bpl := pr.bplList[threadNum]
+	pl.Reset()
+	bpl.Reset()
+
 	if err := pl.Unmarshal(val); err != nil {
 		return nil, errors.Wrapf(err, "while reading posting list")
 	}
-	backupVal, err := posting.ToBackupPostingList(pl).Marshal()
+	posting.ToBackupPostingList(pl, bpl)
+	backupVal, err := bpl.Marshal()
+
 	if err != nil {
 		return nil, errors.Wrapf(err, "while converting posting list for backup")
 	}
diff --git a/go.mod b/go.mod
index cdb0aff7a00..d7817c90967 100644
--- a/go.mod
+++ b/go.mod
@@ -16,7 +16,7 @@ require (
 	github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect
 	github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect
 	github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
-	github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592
+	github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27
 	github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6
 	github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
diff --git a/go.sum b/go.sum
index 63d1cb1faa1..ade58743500 100644
--- a/go.sum
+++ b/go.sum
@@ -62,8 +62,8 @@ github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkE
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592 h1:j6j3yunDbktI4H3tbj3grt2enO4EPbhstU6Tb8HwqdQ=
-github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
+github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27 h1:2oSyH418QYuj2IhW5XWvSJqu5S2wT6czY2ZmPcLhgB4=
+github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
 github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6 h1:5leDFqGys055YO3TbghBhk/QdRPEwyLPdgsSJfiR20I=
 github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI=
 github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU=
diff --git a/posting/list.go b/posting/list.go
index 6cd02c3d230..96b9e9973cd 100644
--- a/posting/list.go
+++ b/posting/list.go
@@ -1486,17 +1486,15 @@ func (l *List) PartSplits() []uint64 {
 }
 
 // ToBackupPostingList converts a posting list into its representation used for storing backups.
-func ToBackupPostingList(l *pb.PostingList) *pb.BackupPostingList {
-	bl := pb.BackupPostingList{}
-	if l == nil {
-		return &bl
+func ToBackupPostingList(l *pb.PostingList, bl *pb.BackupPostingList) {
+	if l == nil || bl == nil {
+		return
 	}
 
 	bl.Uids = codec.Decode(l.Pack, 0)
 	bl.Postings = l.Postings
 	bl.CommitTs = l.CommitTs
 	bl.Splits = l.Splits
-	return &bl
 }
 
 // FromBackupPostingList converts a posting list in the format used for backups to a
diff --git a/worker/backup_ee.go b/worker/backup_ee.go
index de0442c9306..48e0ba6be3b 100644
--- a/worker/backup_ee.go
+++ b/worker/backup_ee.go
@@ -46,7 +46,7 @@ func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.Status,
 		return nil, err
 	}
 
-	bp := &backup.Processor{DB: pstore, Request: req}
+	bp := backup.NewBackupProcessor(pstore, req)
 	return bp.WriteBackup(ctx)
 }