Skip to content

Commit

Permalink
handling restores with wait group
Browse files Browse the repository at this point in the history
  • Loading branch information
aman-bansal committed Jan 5, 2021
1 parent 03b3ea7 commit f733120
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 18 deletions.
17 changes: 6 additions & 11 deletions graphql/admin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package admin
import (
"context"
"encoding/json"

"github.com/dgraph-io/ristretto/z"
"sync"

"github.com/dgraph-io/dgraph/edgraph"

"github.com/dgraph-io/dgraph/graphql/resolve"
"github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/protos/pb"
Expand Down Expand Up @@ -70,10 +70,9 @@ func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved,
VaultFormat: input.VaultFormat,
}

restoreCloser := z.NewCloser(1)
err = worker.ProcessRestoreRequest(context.Background(), &req, restoreCloser)
wg := &sync.WaitGroup{}
err = worker.ProcessRestoreRequest(context.Background(), &req, wg)
if err != nil {
restoreCloser.Done()
return &resolve.Resolved{
Data: map[string]interface{}{m.Name(): map[string]interface{}{
"code": "Failure",
Expand All @@ -84,12 +83,8 @@ func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved,
}

go func() {
defer restoreCloser.Done()
select {
case <-restoreCloser.HasBeenClosed():
restoreCloser.AddRunning(1)
edgraph.ResetAcl(restoreCloser)
}
wg.Wait()
edgraph.ResetAcl(nil)
}()

return &resolve.Resolved{
Expand Down
5 changes: 3 additions & 2 deletions worker/online_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package worker

import (
"context"
"sync"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
)

func ProcessRestoreRequest(ctx context.Context, req *pb.RestoreRequest) error {
func ProcessRestoreRequest(ctx context.Context, req *pb.RestoreRequest, wg *sync.WaitGroup) error {
glog.Warningf("Restore failed: %v", x.ErrNotSupported)
return x.ErrNotSupported
}
Expand All @@ -39,4 +40,4 @@ func (w *grpcWorker) Restore(ctx context.Context, req *pb.RestoreRequest) (*pb.S

func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error {
return nil
}
}
9 changes: 4 additions & 5 deletions worker/online_restore_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"io"
"net/url"
"strings"
"sync"
"time"

"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"

"github.com/dgraph-io/dgraph/conn"
Expand All @@ -40,8 +40,7 @@ const (
)

// ProcessRestoreRequest verifies the backup data and sends a restore proposal to each group.
func ProcessRestoreRequest(ctx context.Context, req *pb.RestoreRequest,
restoreCloser *z.Closer) error {
func ProcessRestoreRequest(ctx context.Context, req *pb.RestoreRequest, wg *sync.WaitGroup) error {
if req == nil {
return errors.Errorf("restore request cannot be nil")
}
Expand Down Expand Up @@ -94,11 +93,11 @@ func ProcessRestoreRequest(ctx context.Context, req *pb.RestoreRequest,

// TODO: prevent partial restores when proposeRestoreOrSend only sends the restore
// request to a subset of groups.
wg.Add(1)
errCh := make(chan error, len(currentGroups))
for _, gid := range currentGroups {
reqCopy := proto.Clone(req).(*pb.RestoreRequest)
reqCopy.GroupId = gid

go func() {
errCh <- tryRestoreProposal(ctx, reqCopy)
}()
Expand All @@ -109,8 +108,8 @@ func ProcessRestoreRequest(ctx context.Context, req *pb.RestoreRequest,
if err := <-errCh; err != nil {
glog.Errorf("Error while restoring %v", err)
}
restoreCloser.Signal()
}
wg.Done()
}()

return nil
Expand Down

0 comments on commit f733120

Please sign in to comment.