Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(Backup): Improve backup performance #7601

Merged
merged 15 commits into from
Apr 6, 2021
9 changes: 2 additions & 7 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,22 +405,17 @@ func Decode(pack *pb.UidPack, seek uint64) []uint64 {

// DecodeToBuffer is the same as Decode but it returns a z.Buffer which is
// calloc'ed and can be SHOULD be freed up by calling buffer.Release().
func DecodeToBuffer(pack *pb.UidPack, seek uint64) *z.Buffer {
buf, err := z.NewBufferWith(256<<20, 32<<30, z.UseCalloc, "Codec.DecodeToBuffer")
x.Check(err)
buf.AutoMmapAfter(1 << 30)

func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) {
var last uint64
tmp := make([]byte, 16)
dec := Decoder{Pack: pack}
for uids := dec.Seek(seek, SeekStart); len(uids) > 0; uids = dec.Next() {
for uids := dec.Seek(0, SeekStart); len(uids) > 0; uids = dec.Next() {
for _, u := range uids {
n := binary.PutUvarint(tmp, u-last)
x.Check2(buf.Write(tmp[:n]))
last = u
}
}
return buf
}

func match32MSB(num1, num2 uint64) bool {
Expand Down
11 changes: 9 additions & 2 deletions codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
humanize "github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -74,7 +75,10 @@ func TestBufferUidPack(t *testing.T) {
// Some edge case tests.
pack := Encode([]uint64{}, 128)
FreePack(pack)
buf := DecodeToBuffer(&pb.UidPack{}, 0)

buf := z.NewBuffer(10<<10, "TestBufferUidPack")
defer buf.Release()
DecodeToBuffer(buf, &pb.UidPack{})
require.Equal(t, 0, buf.LenNoPadding())
require.NoError(t, buf.Release())

Expand All @@ -90,7 +94,10 @@ func TestBufferUidPack(t *testing.T) {
actual := Decode(pack, 0)
require.Equal(t, expected, actual)

actualbuffer := DecodeToBuffer(pack, 0)
actualbuffer := z.NewBuffer(10<<10, "TestBufferUidPack")
defer actualbuffer.Release()

DecodeToBuffer(actualbuffer, pack)
enc := EncodeFromBuffer(actualbuffer.Bytes(), 256)
require.Equal(t, ExactLen(pack), ExactLen(enc))

Expand Down
4 changes: 3 additions & 1 deletion ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func runLsbackupCmd() error {
type backupEntry struct {
Path string `json:"path"`
Since uint64 `json:"since"`
ReadTs uint64 `json:"read_ts"`
BackupId string `json:"backup_id"`
BackupNum uint64 `json:"backup_num"`
Encrypted bool `json:"encrypted"`
Expand All @@ -290,7 +291,8 @@ func runLsbackupCmd() error {

be := backupEntry{
Path: manifest.Path,
Since: manifest.Since,
Since: manifest.SinceTsDeprecated,
ReadTs: manifest.ReadTs,
BackupId: manifest.BackupId,
BackupNum: manifest.BackupNum,
Encrypted: manifest.Encrypted,
Expand Down
4 changes: 3 additions & 1 deletion graphql/admin/list_backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type group struct {
type manifest struct {
Type string `json:"type,omitempty"`
Since uint64 `json:"since,omitempty"`
ReadTs uint64 `json:"read_ts,omitempty"`
Groups []*group `json:"groups,omitempty"`
BackupId string `json:"backupId,omitempty"`
BackupNum uint64 `json:"backupNum,omitempty"`
Expand Down Expand Up @@ -107,7 +108,8 @@ func convertManifests(manifests []*worker.Manifest) []*manifest {
for i, m := range manifests {
res[i] = &manifest{
Type: m.Type,
Since: m.Since,
Since: m.SinceTsDeprecated,
ReadTs: m.ReadTs,
BackupId: m.BackupId,
BackupNum: m.BackupNum,
Path: m.Path,
Expand Down
17 changes: 7 additions & 10 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ func (l *List) Rollup(alloc *z.Allocator) ([]*bpb.KV, error) {

// ToBackupPostingList uses rollup to generate a single list with no splits.
// It's used during backup so that each backed up posting list is stored in a single key.
func (l *List) ToBackupPostingList(bl *pb.BackupPostingList, alloc *z.Allocator) (*bpb.KV, error) {
func (l *List) ToBackupPostingList(bl *pb.BackupPostingList, alloc *z.Allocator, buf *z.Buffer) (*bpb.KV, error) {
bl.Reset()
l.RLock()
defer l.RUnlock()
Expand All @@ -868,15 +868,12 @@ func (l *List) ToBackupPostingList(bl *pb.BackupPostingList, alloc *z.Allocator)
defer out.free()

ol := out.plist
// Encode uids to []byte instead of []uint64 if we have more than 1000
// uids. We do this to improve the memory usage.
if codec.ApproxLen(ol.Pack) > 1024 {
buf := codec.DecodeToBuffer(ol.Pack, 0)
defer buf.Release()
bl.UidBytes = buf.Bytes()
} else {
bl.Uids = codec.Decode(ol.Pack, 0)
}

// Encode uids to []byte instead of []uint64. This helps improve memory usage.
buf.Reset()
codec.DecodeToBuffer(buf, ol.Pack)
bl.UidBytes = buf.Bytes()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we change this heuristic? This change would mean that bl.Uid remains for backward compatibility only.

Copy link
Contributor Author

@jarifibrahim jarifibrahim Apr 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@manishrjain made this change. We'll have to check with him.

bl.Postings = ol.Postings
bl.CommitTs = ol.CommitTs
bl.Splits = ol.Splits
Expand Down
5 changes: 4 additions & 1 deletion posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/dgraph-io/badger/v3"
bpb "github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/ristretto/z"
"github.com/google/uuid"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -1340,7 +1341,9 @@ func TestSingleListRollup(t *testing.T) {
}

var bl pb.BackupPostingList
kv, err := ol.ToBackupPostingList(&bl, nil)
buf := z.NewBuffer(10<<10, "TestSingleListRollup")
defer buf.Release()
kv, err := ol.ToBackupPostingList(&bl, nil, buf)
require.NoError(t, err)
require.Equal(t, 1, len(kv.UserMeta))
require.Equal(t, BitCompletePosting, kv.UserMeta[0])
Expand Down
24 changes: 18 additions & 6 deletions worker/backup_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ func init() {
type predicateSet map[string]struct{}

// Manifest records backup details, these are values used during restore.
// Since is the timestamp from which the next incremental backup should start (it's set
// to the readTs of the current backup).
// ReadTs will be used to create the next incremental backup.
// Groups are the IDs of the groups involved.
type Manifest struct {
sync.Mutex
//Type is the type of backup, either full or incremental.
Type string `json:"type"`
// Since is the timestamp at which this backup was taken. It's called Since
// because it will become the timestamp from which to backup in the next
// incremental backup.
Since uint64 `json:"since"`
// SinceTsDeprecated is kept for backward compatibility. Use readTs instead of sinceTs.
SinceTsDeprecated uint64 `json:"since"`
// ReadTs is the timestamp at which this backup was taken. This would be
// the since timestamp for the next incremental backup.
ReadTs uint64 `json:"read_ts"`
// Groups is the map of valid groups to predicates at the time the backup was created.
Groups map[uint32][]string `json:"groups"`
// BackupId is a unique ID assigned to all the backups in the same series
Expand All @@ -71,6 +71,18 @@ type Manifest struct {
// DropOperations lists the various DROP operations that took place since the last backup.
// These are used during restore to redo those operations before applying the backup.
DropOperations []*pb.DropOperation `json:"drop_operations"`
// Compression keeps track of the compression that was used for the data.
Compression string `json:"compression"`
}

// ValidReadTs function returns the valid read timestamp. The backup can have
// the readTs=0 if the backup was done on an older version of dgraph. The
// SinceTsDecprecated is kept for backward compatibility.
func (m *Manifest) ValidReadTs() uint64 {
NamanJain8 marked this conversation as resolved.
Show resolved Hide resolved
if m.ReadTs == 0 {
return m.SinceTsDeprecated
}
return m.ReadTs
}

type MasterManifest struct {
Expand Down
58 changes: 36 additions & 22 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.BackupR
return nil, errors.Wrapf(err, "cannot start backup operation")
}
defer closer.Done()

bp := NewBackupProcessor(pstore, req)
defer bp.Close()

return bp.WriteBackup(ctx)
return bp.WriteBackup(closer.Ctx())
}

// BackupGroup backs up the group specified in the backup request.
Expand Down Expand Up @@ -161,8 +162,12 @@ func doBackup(ctx context.Context, req *pb.BackupRequest, forceFull bool) error
return err
}

req.SinceTs = latestManifest.Since
// Use the readTs as the sinceTs for the next backup. If not found, use the
// SinceTsDeprecated value from the latest manifest.
req.SinceTs = latestManifest.ValidReadTs()

if forceFull {
// To force a full backup we'll set the sinceTs to zero.
req.SinceTs = 0
} else {
if x.WorkerConfig.EncryptionKey != nil {
Expand Down Expand Up @@ -201,35 +206,43 @@ func doBackup(ctx context.Context, req *pb.BackupRequest, forceFull bool) error
}

glog.Infof(
"Created backup request: read_ts:%d since_ts:%d unix_ts:\"%s\" destination:\"%s\". Groups=%v\n",
"Created backup request: read_ts:%d since_ts:%d unix_ts:%q destination:%q. Groups=%v\n",
req.ReadTs, req.SinceTs, req.UnixTs, req.Destination, groups)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

resCh := make(chan BackupRes, len(state.Groups))
for _, gid := range groups {
br := proto.Clone(req).(*pb.BackupRequest)
br.GroupId = gid
br.Predicates = predMap[gid]
go func(req *pb.BackupRequest) {
res, err := BackupGroup(ctx, req)
resCh <- BackupRes{res: res, err: err}
}(br)
}

var dropOperations []*pb.DropOperation
for range groups {
if backupRes := <-resCh; backupRes.err != nil {
glog.Errorf("Error received during backup: %v", backupRes.err)
return backupRes.err
} else {
dropOperations = append(dropOperations, backupRes.res.GetDropOperations()...)
{ // This is the code which sends out Backup requests and waits for them to finish.
resCh := make(chan BackupRes, len(state.Groups))
for _, gid := range groups {
br := proto.Clone(req).(*pb.BackupRequest)
br.GroupId = gid
br.Predicates = predMap[gid]
go func(req *pb.BackupRequest) {
res, err := BackupGroup(ctx, req)
resCh <- BackupRes{res: res, err: err}
}(br)
}

for range groups {
if backupRes := <-resCh; backupRes.err != nil {
glog.Errorf("Error received during backup: %v", backupRes.err)
return backupRes.err
} else {
dropOperations = append(dropOperations, backupRes.res.GetDropOperations()...)
}
}
}

dir := fmt.Sprintf(backupPathFmt, req.UnixTs)
m := Manifest{Since: req.ReadTs, Groups: predMap, Version: x.DgraphVersion,
DropOperations: dropOperations, Path: dir}
m := Manifest{
ReadTs: req.ReadTs,
Groups: predMap,
Version: x.DgraphVersion,
DropOperations: dropOperations,
Path: dir,
Compression: "snappy",
}
if req.SinceTs == 0 {
m.Type = "full"
m.BackupId = x.GetRandomName(1)
Expand All @@ -242,6 +255,7 @@ func doBackup(ctx context.Context, req *pb.BackupRequest, forceFull bool) error
m.Encrypted = (x.WorkerConfig.EncryptionKey != nil)

bp := NewBackupProcessor(nil, req)
defer bp.Close()
err = bp.CompleteBackup(ctx, &m)

if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions worker/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type UriHandler interface {
// These function calls are used by both Create and Load.
io.WriteCloser

// BytesWritten returns the number of bytes written.
BytesWritten() int

// GetManifest returns the master manifest, containing information about all the
// backups. If the backup directory is using old formats (version < 21.03) of manifests,
// then it will return a consolidated master manifest.
Expand Down
Loading