Skip to content

Commit

Permalink
Use a sync.Pool to allocate KVs during backup. (#5579)
Browse files Browse the repository at this point in the history
  • Loading branch information
martinmr authored Jun 12, 2020
1 parent ec791b0 commit 3202d24
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 14 deletions.
11 changes: 7 additions & 4 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,26 +810,29 @@ func (l *List) Rollup() ([]*bpb.KV, error) {

// SingleListRollup works like rollup but generates a single list with no splits.
// It's used during backup so that each backed up posting list is stored in a single key.
func (l *List) SingleListRollup() (*bpb.KV, error) {
func (l *List) SingleListRollup(kv *bpb.KV) error {
if kv == nil {
return errors.Errorf("passed kv pointer cannot be nil")
}

l.RLock()
defer l.RUnlock()

out, err := l.rollup(math.MaxUint64, false)
if err != nil {
return nil, errors.Wrapf(err, "failed when calling List.rollup")
return errors.Wrapf(err, "failed when calling List.rollup")
}
// out is only nil when the list's minTs is greater than readTs but readTs
// is math.MaxUint64 so that's not possible. Assert that's true.
x.AssertTrue(out != nil)

kv := &bpb.KV{}
kv.Version = out.newMinTs
kv.Key = l.key
val, meta := marshalPostingList(out.plist)
kv.UserMeta = []byte{meta}
kv.Value = val

return kv, nil
return nil
}

func (out *rollupOutput) marshalPostingListPart(
Expand Down
3 changes: 2 additions & 1 deletion posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,8 @@ func TestSingleListRollup(t *testing.T) {
ol, commits := createMultiPartList(t, size, true)

// Roll list into a single list.
kv, err := ol.SingleListRollup()
kv := &bpb.KV{}
err := ol.SingleListRollup(kv)
require.NoError(t, err)
require.Equal(t, 1, len(kv.UserMeta))
require.Equal(t, BitCompletePosting, kv.UserMeta[0])
Expand Down
32 changes: 23 additions & 9 deletions worker/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"net/url"
"sync"

"github.com/dgraph-io/badger/v2"
bpb "github.com/dgraph-io/badger/v2/pb"
Expand Down Expand Up @@ -49,6 +50,8 @@ type BackupProcessor struct {
plList []*pb.PostingList
// bplList is an array of pre-allocated pb.BackupPostingList objects.
bplList []*pb.BackupPostingList
// kvPool
kvPool *sync.Pool
}

func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor {
Expand All @@ -57,6 +60,11 @@ func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor {
Request: req,
plList: make([]*pb.PostingList, backupNumGo),
bplList: make([]*pb.BackupPostingList, backupNumGo),
kvPool: &sync.Pool{
New: func() interface{} {
return &bpb.KV{}
},
},
}

for i := range bp.plList {
Expand Down Expand Up @@ -152,7 +160,12 @@ func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.Status, error)
maxVersion = kv.Version
}
}
return writeKVList(list, gzWriter)
err := writeKVList(list, gzWriter)

for _, kv := range list.Kv {
pr.kvPool.Put(kv)
}
return err
}

if err := stream.Orchestrate(context.Background()); err != nil {
Expand Down Expand Up @@ -230,14 +243,17 @@ func (pr *BackupProcessor) toBackupList(key []byte, itr *badger.Iterator) (
return list, nil
}

kv := pr.kvPool.Get().(*bpb.KV)
kv.Reset()

switch item.UserMeta() {
case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting:
l, err := posting.ReadPostingList(key, itr)
if err != nil {
return nil, errors.Wrapf(err, "while reading posting list")
}

kv, err := l.SingleListRollup()
err = l.SingleListRollup(kv)
if err != nil {
return nil, errors.Wrapf(err, "while rolling up list")
}
Expand Down Expand Up @@ -265,13 +281,11 @@ func (pr *BackupProcessor) toBackupList(key []byte, itr *badger.Iterator) (
return nil, err
}

kv := &bpb.KV{
Key: backupKey,
Value: valCopy,
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
ExpiresAt: item.ExpiresAt(),
}
kv.Key = backupKey
kv.Value = valCopy
kv.UserMeta = []byte{item.UserMeta()}
kv.Version = item.Version()
kv.ExpiresAt = item.ExpiresAt()
list.Kv = append(list.Kv, kv)
default:
return nil, errors.Errorf(
Expand Down

0 comments on commit 3202d24

Please sign in to comment.