Skip to content

Commit

Permalink
feat(Restore): Async restore operations. (#5704)
Browse files Browse the repository at this point in the history
Previously, restore operations blocked the HTTP response until they were completed.
This PR changes the flow so that the request returns immediately after the restore
is proposed to the cluster.

The request returns a unique ID for the restore operation that can be used to track the
status of the response.

Fixes DGRAPH-1697
  • Loading branch information
martinmr authored and Arijit Das committed Jul 14, 2020
1 parent 5bfa9f6 commit dba52ba
Show file tree
Hide file tree
Showing 10 changed files with 337 additions and 39 deletions.
7 changes: 2 additions & 5 deletions ee/acl/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1973,10 +1973,7 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
query: `
mutation {
restore(input: {location: "", backupId: "", encryptionKeyFile: ""}) {
response {
code
message
}
code
}
}`,
queryName: "restore",
Expand All @@ -1986,7 +1983,7 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
" manifests: The path \"\" does not exist or it is inaccessible.",
Locations: []x.Location{{Line: 3, Column: 8}},
}},
guardianData: `{"restore": null}`,
guardianData: `{"restore": {"code": "Failure"}}`,
},
{
name: "getGQLSchema has guardian auth",
Expand Down
12 changes: 8 additions & 4 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,11 @@ var (
resolve.GuardianAuthMW4Mutation,
}
adminQueryMWConfig = map[string]resolve.QueryMiddlewares{
"health": {resolve.IpWhitelistingMW4Query}, // dgraph handles Guardian auth for health
"state": {resolve.IpWhitelistingMW4Query}, // dgraph handles Guardian auth for state
"config": commonAdminQueryMWs,
"listBackups": commonAdminQueryMWs,
"health": {resolve.IpWhitelistingMW4Query}, // dgraph handles Guardian auth for health
"state": {resolve.IpWhitelistingMW4Query}, // dgraph handles Guardian auth for state
"config": commonAdminQueryMWs,
"listBackups": commonAdminQueryMWs,
"restoreStatus": commonAdminQueryMWs,
// not applying ip whitelisting to keep it in sync with /alter
"getGQLSchema": {resolve.GuardianAuthMW4Query},
// for queries and mutations related to User/Group, dgraph handles Guardian auth,
Expand Down Expand Up @@ -490,6 +491,9 @@ func newAdminResolverFactory() resolve.ResolverFactory {
WithQueryResolver("listBackups", func(q schema.Query) resolve.QueryResolver {
return resolve.QueryResolverFunc(resolveListBackups)
}).
WithQueryResolver("restoreStatus", func(q schema.Query) resolve.QueryResolver {
return resolve.QueryResolverFunc(resolveRestoreStatus)
}).
WithMutationResolver("updateGQLSchema", func(m schema.Mutation) resolve.MutationResolver {
return resolve.MutationResolverFunc(
func(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
Expand Down
35 changes: 33 additions & 2 deletions graphql/admin/endpoints_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,21 @@ const adminTypes = `
}
type RestorePayload {
response: Response
"""
A short string indicating whether the restore operation was successfully scheduled.
The status of the operation can be queried using the restoreStatus endpoint.
"""
code: String
"""
Includes the error message if the operation failed.
"""
message: String
"""
The unique ID that can be used to query the status of the restore operation.
"""
restoreId: Int
}
input ListBackupsInput {
Expand Down Expand Up @@ -209,6 +223,18 @@ const adminTypes = `
"""
type: String
}
type RestoreStatus {
"""
The status of the restore operation. One of UNKNOWN, IN_PROGRESS, OK, or ERR.
"""
status: String!
"""
A list of error messages if the restore operation failed.
"""
errors: [String]
}
type LoginResponse {
Expand Down Expand Up @@ -465,4 +491,9 @@ const adminQueries = `
"""
Get the information about the backups at a given location.
"""
listBackups(input: ListBackupsInput!) : [Manifest]`
listBackups(input: ListBackupsInput!) : [Manifest]
"""
Get information about a restore operation.
"""
restoreStatus(restoreId: Int!) : RestoreStatus`
20 changes: 15 additions & 5 deletions graphql/admin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type restoreInput struct {
}

func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {

input, err := getRestoreInput(m)
if err != nil {
return resolve.EmptyResult(m, err), false
Expand All @@ -62,15 +61,26 @@ func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved,
VaultSecretidFile: input.VaultSecretIDFile,
VaultPath: input.VaultPath,
VaultField: input.VaultField,
VaultFormat: input.VaultFormat,
VaultFormat: input.VaultFormat,
}
err = worker.ProcessRestoreRequest(context.Background(), &req)
restoreId, err := worker.ProcessRestoreRequest(context.Background(), &req)
if err != nil {
return resolve.EmptyResult(m, err), false
worker.DeleteRestoreId(restoreId)
return &resolve.Resolved{
Data: map[string]interface{}{m.Name(): map[string]interface{}{
"code": "Failure",
}},
Field: m,
Err: schema.GQLWrapLocationf(err, m.Location(), "resolving %s failed", m.Name()),
}, false
}

return &resolve.Resolved{
Data: map[string]interface{}{m.Name(): response("Success", "Restore completed.")},
Data: map[string]interface{}{m.Name(): map[string]interface{}{
"code": "Success",
"message": "Restore operation started.",
"restoreId": restoreId,
}},
Field: m,
}, true
}
Expand Down
82 changes: 82 additions & 0 deletions graphql/admin/restore_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package admin

import (
"context"
"encoding/json"

"github.com/dgraph-io/dgraph/graphql/resolve"
"github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/worker"
)

type restoreStatus struct {
Status string `json:"status,omitempty"`
Errors []string `json:"errors,omitempty"`
}

func unknownStatus(q schema.Query, err error) *resolve.Resolved {
return &resolve.Resolved{
Data: map[string]interface{}{q.Name(): map[string]interface{}{
"status": "UNKNOWN",
}},
Field: q,
Err: schema.GQLWrapLocationf(err, q.Location(), "resolving %s failed", q.Name()),
}
}

func resolveRestoreStatus(ctx context.Context, q schema.Query) *resolve.Resolved {
restoreId := int(q.ArgValue("restoreId").(int64))
status, err := worker.ProcessRestoreStatus(ctx, restoreId)
if err != nil {
return unknownStatus(q, err)
}
if status == nil {
return unknownStatus(q, err)
}
convertedStatus := convertStatus(status)

b, err := json.Marshal(convertedStatus)
if err != nil {
return unknownStatus(q, err)
}
result := make(map[string]interface{})
err = json.Unmarshal(b, &result)
if err != nil {
return unknownStatus(q, err)
}

return &resolve.Resolved{
Data: map[string]interface{}{q.Name(): result},
Field: q,
}
}

func convertStatus(status *worker.RestoreStatus) *restoreStatus {
if status == nil {
return nil
}
res := &restoreStatus{
Status: status.Status,
Errors: make([]string, len(status.Errors)),
}
for i, err := range status.Errors {
res.Errors[i] = err.Error()
}
return res
}
57 changes: 46 additions & 11 deletions systest/online-restore/online_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@ import (
"github.com/dgraph-io/dgraph/testutil"
)

func sendRestoreRequest(t *testing.T, backupId string, dg *dgo.Dgraph) {
func sendRestoreRequest(t *testing.T, backupId string) int {
restoreRequest := fmt.Sprintf(`mutation restore() {
restore(input: {location: "/data/backup", backupId: "%s",
encryptionKeyFile: "/data/keys/enc_key"}) {
response {
code
message
}
code
message
restoreId
}
}`, backupId)

Expand All @@ -58,8 +57,42 @@ func sendRestoreRequest(t *testing.T, backupId string, dg *dgo.Dgraph) {
resp, err := http.Post(adminUrl, "application/json", bytes.NewBuffer(b))
require.NoError(t, err)
buf, err := ioutil.ReadAll(resp.Body)
bufString := string(buf)
require.NoError(t, err)
require.Contains(t, bufString, "Success")
jsonMap := make(map[string]map[string]interface{})
require.NoError(t, json.Unmarshal([]byte(bufString), &jsonMap))
restoreId := int(jsonMap["data"]["restore"].(map[string]interface{})["restoreId"].(float64))
require.NotEqual(t, "", restoreId)
return restoreId
}

func waitForRestore(t *testing.T, restoreId int, dg *dgo.Dgraph) {
query := fmt.Sprintf(`query status() {
restoreStatus(restoreId: %d) {
status
errors
}
}`, restoreId)
adminUrl := "http://localhost:8180/admin"
params := testutil.GraphQLParams{
Query: query,
}
b, err := json.Marshal(params)
require.NoError(t, err)
require.Contains(t, string(buf), "Restore completed.")

for i := 0; i < 15; i++ {
resp, err := http.Post(adminUrl, "application/json", bytes.NewBuffer(b))
require.NoError(t, err)
buf, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
sbuf := string(buf)
if strings.Contains(sbuf, "OK") {
return
}
time.Sleep(time.Second)
}
require.True(t, false, "restore operation did not complete after max number of retries")

// Wait for the client to exit draining mode. This is needed because the client might
// be connected to a follower and might be behind the leader in applying the restore.
Expand Down Expand Up @@ -179,7 +212,8 @@ func TestBasicRestore(t *testing.T) {
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

sendRestoreRequest(t, "youthful_rhodes3", dg)
restoreId := sendRestoreRequest(t, "youthful_rhodes3")
waitForRestore(t, restoreId, dg)
runQueries(t, dg, false)
runMutations(t, dg)
}
Expand All @@ -194,12 +228,14 @@ func TestMoveTablets(t *testing.T) {
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

sendRestoreRequest(t, "youthful_rhodes3", dg)
restoreId := sendRestoreRequest(t, "youthful_rhodes3")
waitForRestore(t, restoreId, dg)
runQueries(t, dg, false)

// Send another restore request with a different backup. This backup has some of the
// same predicates as the previous one but they are stored in different groups.
sendRestoreRequest(t, "blissful_hermann1", dg)
restoreId = sendRestoreRequest(t, "blissful_hermann1")
waitForRestore(t, restoreId, dg)

resp, err := dg.NewTxn().Query(context.Background(), `{
q(func: has(name), orderasc: name) {
Expand All @@ -226,10 +262,9 @@ func TestInvalidBackupId(t *testing.T) {
restoreRequest := `mutation restore() {
restore(input: {location: "/data/backup", backupId: "bad-backup-id",
encryptionKeyFile: "/data/keys/enc_key"}) {
response {
code
message
}
restoreId
}
}`

Expand Down
Loading

0 comments on commit dba52ba

Please sign in to comment.