Skip to content

Commit

Permalink
feat(restore): introduce incremental restore (#7942)
Browse files Browse the repository at this point in the history
This commit introduces incremental restore. It allows incremental
backups to be restored on top of a set of already restored backups.
In between two incremental restores, the cluster is in draining mode.
  • Loading branch information
ahsanbarkati authored and mangalaman93 committed Feb 18, 2023
1 parent 9f44fe1 commit c7f6c5a
Show file tree
Hide file tree
Showing 47 changed files with 698 additions and 387 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/dgraph-io/dgo/v210 v210.0.0-20210407152819-261d1c2a6987
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.1
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
github.com/dgraph-io/ristretto v0.1.1
github.com/dgraph-io/simdjson-go v0.3.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible
Expand Down
7 changes: 2 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDws
github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
github.com/dgraph-io/gqlparser/v2 v2.2.1 h1:15msK9XEHOSrRqQO48UU+2ZTf1R1U8+tfL9H5D5/eQQ=
github.com/dgraph-io/gqlparser/v2 v2.2.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed h1:pgGMBoTtFhR+xkyzINaToLYRurHn+6pxMYffIGmmEPc=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 h1:X2NRsgAtVUAp2nmTPCq+x+wTcRRrj74CEpy7E0Unsl4=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
Expand Down Expand Up @@ -335,13 +335,10 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/graph-gophers/graphql-go v0.0.0-20200309224638-dae41bde9ef9 h1:kLnsdud6Fl1/7ZX/5oD23cqYAzBfuZBhNkGr2NvuEsU=
github.com/graph-gophers/graphql-go v0.0.0-20200309224638-dae41bde9ef9/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
github.com/graph-gophers/graphql-transport-ws v0.0.2 h1:DbmSkbIGzj8SvHei6n8Mh9eLQin8PtA8xY9eCzjRpvo=
github.com/graph-gophers/graphql-transport-ws v0.0.2/go.mod h1:5BVKvFzOd2BalVIBFfnfmHjpJi/MZ5rOj8G55mXvZ8g=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.4.1/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
Expand Down
12 changes: 12 additions & 0 deletions graphql/admin/endpoints_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ const adminTypes = `
"""
backupNum: Int
"""
All the backups with num >= incrementalFrom will be restored.
"""
incrementalFrom: Int
"""
If isPartial is set to true then the cluster will be kept in draining mode after
restore. This makes sure that the db is not corrupted by any mutations or tablet
moves in between two restores.
"""
isPartial: Boolean
"""
Path to the key file needed to decrypt the backup. This file should be accessible
by all alphas in the group. The backup will be written using the encryption key
Expand Down
4 changes: 4 additions & 0 deletions graphql/admin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type restoreInput struct {
Location string
BackupId string
BackupNum int
IncrementalFrom int
IsPartial bool
EncryptionKeyFile string
AccessKey string
SecretKey string
Expand All @@ -57,6 +59,8 @@ func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved,
Location: input.Location,
BackupId: input.BackupId,
BackupNum: uint64(input.BackupNum),
IncrementalFrom: uint64(input.IncrementalFrom),
IsPartial: input.IsPartial,
EncryptionKeyFile: input.EncryptionKeyFile,
AccessKey: input.AccessKey,
SecretKey: input.SecretKey,
Expand Down
2 changes: 2 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ message RestoreRequest {
string vault_format = 15;

uint64 backup_num = 16;
uint64 incremental_from = 17;
bool is_partial = 18;
}

message Proposal {
Expand Down
764 changes: 423 additions & 341 deletions protos/pb/pb.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,10 @@ func Load(predicate string) error {

// LoadFromDb reads schema information from db and stores it in memory
func LoadFromDb(ctx context.Context) error {
// Reset the state because with the introduction of incremental restore,
// it can't be assumed that the state would be empty before loading the
// schema from the DB as we don't do drop all in case of incremental restores.
State().DeleteAll()
if err := loadFromDB(ctx, loadSchema); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"full","since":55,"groups":{"1":["dgraph.graphql.schema_created_at","dgraph.graphql.schema","dgraph.drop.op","dgraph.cors","dgraph.graphql.xid","dgraph.graphql.p_sha256hash","dgraph.graphql.p_query","dgraph.type","dgraph.graphql.schema_history"],"2":["name"]},"backup_id":"wizardly_blackburn2","backup_num":1,"encrypted":false,"drop_operations":null}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"incremental","since":58,"groups":{"1":["dgraph.graphql.p_sha256hash","dgraph.graphql.p_query","dgraph.graphql.schema_created_at","dgraph.cors","dgraph.graphql.schema_history","dgraph.type","dgraph.graphql.schema","dgraph.drop.op","dgraph.graphql.xid"],"2":["name","age"]},"backup_id":"wizardly_blackburn2","backup_num":2,"encrypted":false,"drop_operations":null}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"incremental","since":68,"groups":{"1":["dgraph.graphql.schema","dgraph.type","dgraph.graphql.p_query","dgraph.graphql.schema_created_at","dgraph.graphql.xid","dgraph.graphql.schema_history","dgraph.drop.op","dgraph.graphql.p_sha256hash","dgraph.cors"],"2":["name","age"]},"backup_id":"wizardly_blackburn2","backup_num":3,"encrypted":false,"drop_operations":[{"drop_op":2,"drop_value":"name"}]}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"incremental","since":72,"groups":{"1":["dgraph.type","dgraph.graphql.schema_created_at","dgraph.graphql.schema_history","dgraph.graphql.xid","dgraph.graphql.p_query","dgraph.graphql.schema","dgraph.drop.op","dgraph.cors","dgraph.graphql.p_sha256hash"],"2":["age","name"]},"backup_id":"wizardly_blackburn2","backup_num":4,"encrypted":false,"drop_operations":null}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"incremental","since":86,"groups":{"1":["dgraph.graphql.xid","dgraph.graphql.schema","dgraph.drop.op","dgraph.type","dgraph.cors","dgraph.graphql.schema_created_at","dgraph.graphql.schema_history","dgraph.graphql.p_query","dgraph.graphql.p_sha256hash"],"2":["age","name"]},"backup_id":"wizardly_blackburn2","backup_num":5,"encrypted":false,"drop_operations":[{"drop_op":1}]}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"incremental","since":104,"groups":{"1":["dgraph.graphql.p_sha256hash","dgraph.graphql.schema","dgraph.graphql.p_query","dgraph.cors","dgraph.graphql.schema_history","dgraph.graphql.schema_created_at","dgraph.type","dgraph.graphql.xid","dgraph.drop.op"],"2":["name","age"]},"backup_id":"wizardly_blackburn2","backup_num":6,"encrypted":false,"drop_operations":[{}]}
Binary file not shown.
Binary file not shown.
158 changes: 141 additions & 17 deletions systest/online-restore/online_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,37 @@ import (
"github.com/dgraph-io/dgraph/x"
)

func sendRestoreRequest(t *testing.T, location, backupId string, backupNum int) {
if location == "" {
location = "/data/backup2"
}
type restoreReq struct {
location string
backupId string
backupNum int
encKeyFile string
incrementalFrom int
}

const (
backupLocation = "/data/backup2/backups"
backup2011Location = "/data/backup2/backups2011"
encKeyFile = "/data/keys/enc_key"
)

func sendRestoreRequest(t *testing.T, req *restoreReq) {
t.Logf("Restoring backup number: %d\n", req.backupNum)
params := testutil.GraphQLParams{
Query: `mutation restore($location: String!, $backupId: String, $backupNum: Int) {
Query: `mutation restore($location: String!, $backupId: String, $backupNum: Int,
$encKey: String, $incrFrom: Int) {
restore(input: {location: $location, backupId: $backupId, backupNum: $backupNum,
encryptionKeyFile: "/data/keys/enc_key"}) {
encryptionKeyFile: $encKey, incrementalFrom: $incrFrom}) {
code
message
}
}`,
Variables: map[string]interface{}{
"location": location,
"backupId": backupId,
"backupNum": backupNum,
"location": req.location,
"backupId": req.backupId,
"backupNum": req.backupNum,
"encKey": req.encKeyFile,
"incrFrom": req.incrementalFrom,
},
}
resp := testutil.MakeGQLRequestWithTLS(t, &params, testutil.GetAlphaClientConfig(t))
Expand Down Expand Up @@ -207,8 +222,10 @@ func TestBasicRestore(t *testing.T) {
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

snapshotTs := getSnapshotTs(t)
sendRestoreRequest(t, "", "youthful_rhodes3", 0)
req := &restoreReq{location: backupLocation, backupId: "youthful_rhodes3", encKeyFile: encKeyFile}
sendRestoreRequest(t, req)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)

// Snapshot must be taken just after the restore and hence the snapshotTs be updated.
require.NoError(t, x.RetryUntilSuccess(3, 2*time.Second, func() error {
if getSnapshotTs(t) <= snapshotTs {
Expand All @@ -220,6 +237,107 @@ func TestBasicRestore(t *testing.T) {
runMutations(t, dg)
}

// The 6 backups that are being restored in this test were taken in the following manner
// Add _:a <name> "alice" .
// Take backup b1
// Add _:b <name> "bob" .
// Add _:b <age> "12" .
// Take backup b2
// Drop attribute "name"
// Take backup b3
// Add _:a <name> "alice" .
// Take backup b4
// drop data
// Take backup b5
// drop all
// Take backup b6
func TestIncrementalRestore(t *testing.T) {
conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))

ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

req := &restoreReq{
location: backup2011Location,
backupNum: 1,
}
sendRestoreRequest(t, req)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)

query := `{ q(func: has(name)) { name age } }`
runQuery := func(query, expectedResp string) {
res, err := dg.NewTxn().Query(context.Background(), query)
require.NoError(t, err)
require.JSONEq(t, expectedResp, string(res.Json))
}
runQuery(query, `{"q":[{"name":"alice"}]}`)

req = &restoreReq{
location: backup2011Location,
backupNum: 2,
incrementalFrom: 2,
}
sendRestoreRequest(t, req)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)
runQuery(query, `{"q":[{"name":"alice"}, {"name":"bob", "age": "12"}]}`)

req = &restoreReq{
location: backup2011Location,
backupNum: 3,
incrementalFrom: 3,
}
sendRestoreRequest(t, req)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)
runQuery(query, `{"q":[]}`)
runQuery(`{ q(func: has(age)) {age} }`, `{"q":[{"age": "12"}]}`)

req = &restoreReq{
location: backup2011Location,
backupNum: 4,
incrementalFrom: 4,
}
sendRestoreRequest(t, req)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)
runQuery(query, `{"q":[{"name":"alice"}]}`)

req = &restoreReq{
location: backup2011Location,
backupNum: 5,
incrementalFrom: 5,
}

sendRestoreRequest(t, req)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)
runQuery(query, `{"q":[]}`)
runQuery(`{ q(func: has(age)) {age} }`, `{"q":[]}`)

// after drop data, there should be no data but schema should still contain the predicates.
res, err := dg.NewTxn().Query(context.Background(), `schema{}`)
require.NoError(t, err)
require.Contains(t, string(res.Json), `{"predicate":"age","type":"default"}`)
require.Contains(t, string(res.Json), `{"predicate":"name","type":"default"}`)

req = &restoreReq{
location: backup2011Location,
backupNum: 6,
incrementalFrom: 6,
}

// after drop all
sendRestoreRequest(t, req)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)
runQuery(query, `{"q":[]}`)
runQuery(`{ q(func: has(age)) {age} }`, `{"q":[]}`)

res, err = dg.NewTxn().Query(context.Background(), `schema{}`)
require.NoError(t, err)
require.NotContains(t, string(res.Json), `{"predicate":"age","type":"default"}`)
require.NotContains(t, string(res.Json), `{"predicate":"name","type":"default"}`)
}

func TestRestoreBackupNum(t *testing.T) {
disableDraining(t)

Expand All @@ -234,8 +352,10 @@ func TestRestoreBackupNum(t *testing.T) {
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))
runQueries(t, dg, true)

sendRestoreRequest(t, "", "youthful_rhodes3", 1)
req := &restoreReq{location: backupLocation, backupId: "youthful_rhodes3", backupNum: 1, encKeyFile: encKeyFile}
sendRestoreRequest(t, req)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)

runQueries(t, dg, true)
runMutations(t, dg)
}
Expand All @@ -256,7 +376,7 @@ func TestRestoreBackupNumInvalid(t *testing.T) {

// Send a request with a backupNum greater than the number of manifests.
restoreRequest := fmt.Sprintf(`mutation restore() {
restore(input: {location: "/data/backup2", backupId: "%s", backupNum: %d,
restore(input: {location: "/data/backup2/backups", backupId: "%s", backupNum: %d,
encryptionKeyFile: "/data/keys/enc_key"}) {
code
message
Expand All @@ -279,7 +399,7 @@ func TestRestoreBackupNumInvalid(t *testing.T) {

// Send a request with a negative backupNum value.
restoreRequest = fmt.Sprintf(`mutation restore() {
restore(input: {location: "/data/backup2", backupId: "%s", backupNum: %d,
restore(input: {location: "/data/backup2/backups", backupId: "%s", backupNum: %d,
encryptionKeyFile: "/data/keys/enc_key"}) {
code
message
Expand Down Expand Up @@ -313,13 +433,15 @@ func TestMoveTablets(t *testing.T) {
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

sendRestoreRequest(t, "", "youthful_rhodes3", 0)
req := &restoreReq{location: backupLocation, backupId: "youthful_rhodes3", encKeyFile: encKeyFile}
sendRestoreRequest(t, req)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)
runQueries(t, dg, false)

// Send another restore request with a different backup. This backup has some of the
// same predicates as the previous one but they are stored in different groups.
sendRestoreRequest(t, "", "blissful_hermann1", 0)
req = &restoreReq{location: backupLocation, backupId: "blissful_hermann1", encKeyFile: encKeyFile}
sendRestoreRequest(t, req)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)

resp, err := dg.NewTxn().Query(context.Background(), `{
Expand Down Expand Up @@ -367,7 +489,7 @@ func TestInvalidBackupId(t *testing.T) {

func TestListBackups(t *testing.T) {
query := `query backup() {
listBackups(input: {location: "/data/backup2"}) {
listBackups(input: {location: "/data/backup2/backups"}) {
backupId
backupNum
encrypted
Expand Down Expand Up @@ -595,9 +717,11 @@ func backup(t *testing.T, backupDir string) {

func backupRestoreAndVerify(t *testing.T, dg *dgo.Dgraph, backupDir, queryToVerify,
expectedResponse string, schemaVerificationOpts testutil.SchemaOptions) {

schemaVerificationOpts.ExcludeAclSchema = true
backup(t, backupDir)
sendRestoreRequest(t, backupDir, "", 0)
req := &restoreReq{location: backupDir, encKeyFile: encKeyFile}
sendRestoreRequest(t, req)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)
testutil.VerifyQueryResponse(t, dg, queryToVerify, expectedResponse)
testutil.VerifySchema(t, dg, schemaVerificationOpts)
Expand Down
2 changes: 1 addition & 1 deletion testutil/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func WaitForRestore(t *testing.T, dg *dgo.Dgraph, HttpSocket string) {
restoreDone = true
break
}
time.Sleep(4 * time.Second)
time.Sleep(1 * time.Second)
}
require.True(t, restoreDone)

Expand Down
6 changes: 4 additions & 2 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,9 @@ func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error {
case proposal.Restore != nil:
// Enable draining mode for the duration of the restore processing.
x.UpdateDrainingMode(true)
defer x.UpdateDrainingMode(false)
if !proposal.Restore.IsPartial {
defer x.UpdateDrainingMode(false)
}

var err error
var closer *z.Closer
Expand All @@ -653,7 +655,7 @@ func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error {
}
defer closer.Done()

glog.Infof("Got restore proposal at Index:%d, ReadTs:%d",
glog.Infof("Got restore proposal at Index: %d, ReadTs: %d",
proposal.Index, proposal.Restore.RestoreTs)
if err := handleRestoreProposal(ctx, proposal.Restore, proposal.Index); err != nil {
return err
Expand Down
Loading

0 comments on commit c7f6c5a

Please sign in to comment.