Skip to content

Commit

Permalink
Store group to predicate mapping as part of the backup manifest. (hyp…
Browse files Browse the repository at this point in the history
  • Loading branch information
martinmr authored and dna2github committed Jul 19, 2019
1 parent 837646d commit 0d6b05c
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 388 deletions.
25 changes: 22 additions & 3 deletions dgraph/cmd/alpha/admin_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,21 @@ func processHttpBackupRequest(ctx context.Context, r *http.Request) error {
req.SinceTs = 0
}

groups := worker.KnownGroups()
// Update the membership state to get the latest mapping of groups to predicates.
if err := worker.UpdateMembershipState(ctx); err != nil {
return err
}
state := worker.GetMembershipState()
var groups []uint32
for gid := range state.Groups {
groups = append(groups, gid)
}

glog.Infof("Created backup request: %s. Groups=%v\n", &req, groups)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

errCh := make(chan error, len(groups))
errCh := make(chan error, len(state.Groups))
for _, gid := range groups {
req := req
req.GroupId = gid
Expand All @@ -132,7 +141,17 @@ func processHttpBackupRequest(ctx context.Context, r *http.Request) error {
}
}

m := backup.Manifest{Groups: groups, Since: req.ReadTs}
// Convert state into a map for writing into the manifest.
manifestGroups := make(map[uint32][]string)
for gid, group := range state.Groups {
var preds []string
for key := range group.Tablets {
preds = append(preds, key)
}
manifestGroups[gid] = preds
}

m := backup.Manifest{Since: req.ReadTs, Groups: manifestGroups}
if req.SinceTs == 0 {
m.Type = "full"
m.BackupId = x.GetRandomName(1)
Expand Down
6 changes: 3 additions & 3 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Manifest struct {
// incremental backup.
Since uint64 `json:"since"`
// Groups is the list of valid groups at the time the backup was created.
Groups []uint32 `json:"groups"`
Groups map[uint32][]string `json:"groups"`
// BackupId is a unique ID assigned to all the backups in the same series
// (from the first full backup to the last incremental backup).
BackupId string `json:"backup_id"`
Expand All @@ -64,8 +64,8 @@ type Manifest struct {
// retrieval to stream.Orchestrate. The writer will create all the fd's needed to
// collect the data and later move to the target.
// Returns errors on failure, nil on success.
func (pr *Processor) WriteBackup(ctx context.Context) (*pb.BackupResponse, error) {
var emptyRes pb.BackupResponse
func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) {
var emptyRes pb.Status

if err := ctx.Err(); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion ee/backup/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (h *fileHandler) Load(uri *url.URL, backupId string, fn loadFn) (uint64, er
}

path := filepath.Dir(manifests[i].Path)
for _, groupId := range manifest.Groups {
for groupId := range manifest.Groups {
file := filepath.Join(path, backupName(manifest.Since, groupId))
fp, err := os.Open(file)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion ee/backup/s3_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (h *s3Handler) Load(uri *url.URL, backupId string, fn loadFn) (uint64, erro
}

path := filepath.Dir(manifests[i].Path)
for _, groupId := range manifest.Groups {
for groupId := range manifest.Groups {
object := filepath.Join(path, backupName(manifest.Since, groupId))
reader, err := mc.GetObject(h.bucketName, object, minio.GetObjectOptions{})
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ service Worker {
rpc StreamSnapshot (stream Snapshot) returns (stream KVS) {}
rpc Sort (SortMessage) returns (SortResult) {}
rpc Schema (SchemaRequest) returns (SchemaResult) {}
rpc Backup (BackupRequest) returns (BackupResponse) {}
rpc Backup (BackupRequest) returns (Status) {}
rpc Export (ExportRequest) returns (Status) {}
rpc ReceivePredicate(stream KVS) returns (api.Payload) {}
rpc MovePredicate(MovePredicatePayload) returns (api.Payload) {}
Expand Down Expand Up @@ -488,10 +488,6 @@ message BackupRequest {
bool anonymous = 9;
}

// Dummy empty response
message BackupResponse {
}

message ExportRequest {
uint32 group_id = 1; // Group id to back up.
uint64 read_ts = 2;
Expand Down
607 changes: 239 additions & 368 deletions protos/pb/pb.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion worker/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// Backup implements the Worker interface.
func (w *grpcWorker) Backup(ctx context.Context, req *pb.BackupRequest) (*pb.BackupResponse, error) {
func (w *grpcWorker) Backup(ctx context.Context, req *pb.BackupRequest) (*pb.Status, error) {
glog.Warningf("Backup failed: %v", x.ErrNotSupported)
return nil, x.ErrNotSupported
}
9 changes: 3 additions & 6 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ import (
)

// Backup handles a request coming from another node.
func (w *grpcWorker) Backup(ctx context.Context, req *pb.BackupRequest) (
*pb.BackupResponse, error) {

func (w *grpcWorker) Backup(ctx context.Context, req *pb.BackupRequest) (*pb.Status, error) {
glog.V(2).Infof("Received backup request via Grpc: %+v", req)
return backupCurrentGroup(ctx, req)
}

func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (
*pb.BackupResponse, error) {
func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.Status, error) {
glog.Infof("Backup request: group %d at %d", req.GroupId, req.ReadTs)
if err := ctx.Err(); err != nil {
glog.Errorf("Context error during backup: %v\n", err)
Expand All @@ -54,7 +51,7 @@ func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (
}

// BackupGroup backs up the group specified in the backup request.
func BackupGroup(ctx context.Context, in *pb.BackupRequest) (*pb.BackupResponse, error) {
func BackupGroup(ctx context.Context, in *pb.BackupRequest) (*pb.Status, error) {
glog.V(2).Infof("Sending backup request: %+v\n", in)
if groups().groupId() == in.GroupId {
return backupCurrentGroup(ctx, in)
Expand Down
8 changes: 8 additions & 0 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,14 @@ func MaxLeaseId() uint64 {
return g.state.MaxLeaseId
}

// GetMembershipState returns the current membership state.
func GetMembershipState() *pb.MembershipState {
g := groups()
g.RLock()
defer g.RUnlock()
return proto.Clone(g.state).(*pb.MembershipState)
}

// UpdateMembershipState contacts zero for an update on membership state.
func UpdateMembershipState(ctx context.Context) error {
g := groups()
Expand Down

0 comments on commit 0d6b05c

Please sign in to comment.