Skip to content

Commit

Permalink
perf(Backup): Improve backup performance (#7601)
Browse files Browse the repository at this point in the history
This PR has the following changes

- Use thread-local buffer for codecs
- Add new ReadTs field to Manifest and use it instead of sinceTs
- Use snappy for backups

(cherry picked from commit 2715d88)
  • Loading branch information
Ibrahim Jarif authored and NamanJain8 committed May 26, 2021
1 parent 37b2211 commit 56a1d62
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 34 deletions.
6 changes: 4 additions & 2 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func runLsbackupCmd() error {
type backupEntry struct {
Path string `json:"path"`
Since uint64 `json:"since"`
ReadTs uint64 `json:"read_ts"`
BackupId string `json:"backup_id"`
BackupNum uint64 `json:"backup_num"`
Encrypted bool `json:"encrypted"`
Expand All @@ -105,7 +106,8 @@ func runLsbackupCmd() error {

be := backupEntry{
Path: manifest.Path,
Since: manifest.Since,
Since: manifest.SinceTsDeprecated,
ReadTs: manifest.ReadTs,
BackupId: manifest.BackupId,
BackupNum: manifest.BackupNum,
Encrypted: manifest.Encrypted,
Expand Down Expand Up @@ -271,7 +273,7 @@ func runExportBackup() error {

in := &pb.ExportRequest{
GroupId: uint32(gid),
ReadTs: latestManifest.Since,
ReadTs: latestManifest.ValidReadTs(),
UnixTs: time.Now().Unix(),
Format: opt.format,
Destination: exportDir,
Expand Down
4 changes: 3 additions & 1 deletion graphql/admin/list_backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type group struct {
type manifest struct {
Type string `json:"type,omitempty"`
Since uint64 `json:"since,omitempty"`
ReadTs uint64 `json:"read_ts,omitempty"`
Groups []*group `json:"groups,omitempty"`
BackupId string `json:"backupId,omitempty"`
BackupNum uint64 `json:"backupNum,omitempty"`
Expand Down Expand Up @@ -107,7 +108,8 @@ func convertManifests(manifests []*worker.Manifest) []*manifest {
for i, m := range manifests {
res[i] = &manifest{
Type: m.Type,
Since: m.Since,
Since: m.SinceTsDeprecated,
ReadTs: m.ReadTs,
BackupId: m.BackupId,
BackupNum: m.BackupNum,
Path: m.Path,
Expand Down
22 changes: 16 additions & 6 deletions worker/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ import (
type predicateSet map[string]struct{}

// Manifest records backup details, these are values used during restore.
// Since is the timestamp from which the next incremental backup should start (it's set
// to the readTs of the current backup).
// ReadTs will be used to create the next incremental backup.
// Groups are the IDs of the groups involved.
type Manifest struct {
sync.Mutex
//Type is the type of backup, either full or incremental.
Type string `json:"type"`
// Since is the timestamp at which this backup was taken. It's called Since
// because it will become the timestamp from which to backup in the next
// incremental backup.
Since uint64 `json:"since"`
// SinceTsDeprecated is kept for backward compatibility. Use readTs instead of sinceTs.
SinceTsDeprecated uint64 `json:"since"`
// ReadTs is the timestamp at which this backup was taken. This would be
// the since timestamp for the next incremental backup.
ReadTs uint64 `json:"read_ts"`
// Groups is the map of valid groups to predicates at the time the backup was created.
Groups map[uint32][]string `json:"groups"`
// BackupId is a unique ID assigned to all the backups in the same series
Expand All @@ -68,6 +68,16 @@ type Manifest struct {
Compression string `json:"compression"`
}

// ValidReadTs function returns the valid read timestamp. The backup can have
// the readTs=0 if the backup was done on an older version of dgraph. The
// SinceTsDecprecated is kept for backward compatibility.
func (m *Manifest) ValidReadTs() uint64 {
if m.ReadTs == 0 {
return m.SinceTsDeprecated
}
return m.ReadTs
}

type MasterManifest struct {
Manifests []*Manifest
}
Expand Down
48 changes: 27 additions & 21 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error {
return err
}

req.SinceTs = latestManifest.Since
// Use the readTs as the sinceTs for the next backup. If not found, use the
// SinceTsDeprecated value from the latest manifest.
req.SinceTs = latestManifest.ValidReadTs()

if req.ForceFull {
// To force a full backup we'll set the sinceTs to zero.
req.SinceTs = 0
} else {
if x.WorkerConfig.EncryptionKey != nil {
Expand Down Expand Up @@ -196,30 +200,32 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

resCh := make(chan BackupRes, len(state.Groups))
for _, gid := range groups {
br := proto.Clone(req).(*pb.BackupRequest)
br.GroupId = gid
br.Predicates = predMap[gid]
go func(req *pb.BackupRequest) {
res, err := BackupGroup(ctx, req)
resCh <- BackupRes{res: res, err: err}
}(br)
}

var dropOperations []*pb.DropOperation
for range groups {
if backupRes := <-resCh; backupRes.err != nil {
glog.Errorf("Error received during backup: %v", backupRes.err)
return backupRes.err
} else {
dropOperations = append(dropOperations, backupRes.res.GetDropOperations()...)
{ // This is the code which sends out Backup requests and waits for them to finish.
resCh := make(chan BackupRes, len(state.Groups))
for _, gid := range groups {
br := proto.Clone(req).(*pb.BackupRequest)
br.GroupId = gid
br.Predicates = predMap[gid]
go func(req *pb.BackupRequest) {
res, err := BackupGroup(ctx, req)
resCh <- BackupRes{res: res, err: err}
}(br)
}

for range groups {
if backupRes := <-resCh; backupRes.err != nil {
glog.Errorf("Error received during backup: %v", backupRes.err)
return backupRes.err
} else {
dropOperations = append(dropOperations, backupRes.res.GetDropOperations()...)
}
}
}

dir := fmt.Sprintf(backupPathFmt, req.UnixTs)
m := Manifest{
Since: req.ReadTs,
ReadTs: req.ReadTs,
Groups: predMap,
Version: x.DgraphVersion,
DropOperations: dropOperations,
Expand Down Expand Up @@ -581,8 +587,8 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) erro

// GoString implements the GoStringer interface for Manifest.
func (m *Manifest) GoString() string {
return fmt.Sprintf(`Manifest{Since: %d, Groups: %v, Encrypted: %v}`,
m.Since, m.Groups, m.Encrypted)
return fmt.Sprintf(`Manifest{Since: %d, ReadTs: %d, Groups: %v, Encrypted: %v}`,
m.SinceTsDeprecated, m.ReadTs, m.Groups, m.Encrypted)
}

func (tl *threadLocal) toBackupList(key []byte, itr *badger.Iterator) (
Expand Down
2 changes: 1 addition & 1 deletion worker/backup_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func getFilteredManifests(h x.UriHandler, manifests []*Manifest,
for _, m := range manifests {
missingFiles := false
for g := range m.Groups {
path := filepath.Join(m.Path, backupName(m.Since, g))
path := filepath.Join(m.Path, backupName(m.ValidReadTs(), g))
if !h.FileExists(path) {
missingFiles = true
break
Expand Down
2 changes: 1 addition & 1 deletion worker/online_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,5 +538,5 @@ func RunOfflineRestore(dir, location, backupId string, keyFile string,
}
}
// TODO: Fix this return value.
return LoadResult{Version: manifest.Since}
return LoadResult{Version: manifest.ValidReadTs()}
}
4 changes: 2 additions & 2 deletions worker/restore_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) {
if dropAll {
break
}
if manifest.Since == 0 || len(manifest.Groups) == 0 {
if manifest.ValidReadTs() == 0 || len(manifest.Groups) == 0 {
continue
}
for gid := range manifest.Groups {
Expand All @@ -630,7 +630,7 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) {

// Only restore the predicates that were assigned to this group at the time
// of the last backup.
file := filepath.Join(manifest.Path, backupName(manifest.Since, gid))
file := filepath.Join(manifest.Path, backupName(manifest.ValidReadTs(), gid))
br := readerFrom(h, file).WithEncryption(keys.EncKey).WithCompression(manifest.Compression)
if br.err != nil {
return nil, errors.Wrap(br.err, "newBackupReader")
Expand Down

0 comments on commit 56a1d62

Please sign in to comment.