Skip to content

Commit

Permalink
v20.07: fix(backup/restore): fixes backup and restore with DROP opera…
Browse files Browse the repository at this point in the history
…tions (G… (#6922)

...RAPHQL-735) (#6844)

Fixes GRAPHQL-735.
See [RFC](https://discuss.dgraph.io/t/backup-and-restore-with-operations-like-drop-all/11218) for more details.

(cherry picked from commit 3179f57)

# Conflicts:
#	dgraph/cmd/alpha/run_test.go
#	dgraph/cmd/bulk/systest/test-bulk-schema.sh
#	ee/acl/acl_test.go
#	graphql/e2e/common/admin.go
#	graphql/e2e/directives/schema_response.json
#	graphql/e2e/normal/schema_response.json
#	graphql/e2e/schema/schema_test.go
#	protos/pb/pb.pb.go
#	schema/schema.go
#	systest/backup/encryption/backup_test.go
#	systest/backup/filesystem/backup_test.go
#	systest/backup/minio/backup_test.go
#	systest/export/export_test.go
#	systest/mutations_test.go
#	systest/online-restore/online_restore_test.go
#	systest/queries_test.go
#	tlstest/mtls_internal/backup/encryption/backup_test.go
#	tlstest/mtls_internal/backup/filesystem/backup_test.go
#	tlstest/mtls_internal/backup/minio/backup_test.go
#	worker/backup_processor.go
#	worker/export.go
#	worker/online_restore_ee.go
#	worker/restore.go
#	x/keys.go
#	x/x.go
  • Loading branch information
abhimanyusinghgaur authored Nov 23, 2020
1 parent d33e492 commit c12e0b9
Show file tree
Hide file tree
Showing 33 changed files with 1,534 additions and 501 deletions.
22 changes: 6 additions & 16 deletions dgraph/cmd/alpha/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,9 @@ func TestDeletePredicate(t *testing.T) {
output, err = runGraphqlQuery(`schema{}`)
require.NoError(t, err)

testutil.CompareJSON(t, `{"data":{"schema":[`+
`{"predicate":"age","type":"default"},`+
`{"predicate":"name","type":"string","index":true, "tokenizer":["term"]},`+
x.AclPredicates+","+x.GraphqlPredicates+","+
`{"predicate":"dgraph.type","type":"string","index":true, "tokenizer":["exact"],
"list":true}],`+x.InitialTypes+`}}`, output)
testutil.CompareJSON(t, testutil.GetFullSchemaHTTPResponse(testutil.SchemaOptions{UserPreds: `{"predicate":"age","type":"default"},` +
`{"predicate":"name","type":"string","index":true, "tokenizer":["term"]}`}),
output)

output, err = runGraphqlQuery(q1)
require.NoError(t, err)
Expand Down Expand Up @@ -1076,11 +1073,8 @@ func TestListTypeSchemaChange(t *testing.T) {
q = `schema{}`
res, err = runGraphqlQuery(q)
require.NoError(t, err)
testutil.CompareJSON(t, `{"data":{"schema":[`+
x.AclPredicates+","+x.GraphqlPredicates+","+
`{"predicate":"occupations","type":"string"},`+
`{"predicate":"dgraph.type", "type":"string", "index":true, "tokenizer": ["exact"],
"list":true}],`+x.InitialTypes+`}}`, res)
testutil.CompareJSON(t, testutil.GetFullSchemaHTTPResponse(testutil.
SchemaOptions{UserPreds: `{"predicate":"occupations","type":"string"}`}), res)
}

func TestDeleteAllSP2(t *testing.T) {
Expand Down Expand Up @@ -1323,11 +1317,7 @@ func TestDropAll(t *testing.T) {
q3 := "schema{}"
output, err = runGraphqlQuery(q3)
require.NoError(t, err)
testutil.CompareJSON(t,
`{"data":{"schema":[`+
x.AclPredicates+","+x.GraphqlPredicates+","+
`{"predicate":"dgraph.type", "type":"string", "index":true, "tokenizer":["exact"],
"list":true}],`+x.InitialTypes+`}}`, output)
testutil.CompareJSON(t, testutil.GetFullSchemaHTTPResponse(testutil.SchemaOptions{}), output)

// Reinstate schema so that we can re-run the original query.
err = alterSchemaWithRetry(s)
Expand Down
1 change: 1 addition & 0 deletions dgraph/cmd/bulk/systest/test-bulk-schema.sh
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ EOF
dgraph debug -p out/1/p 2>|/dev/null | grep '{s}' | cut -d' ' -f4 >> all_dbs.out
diff <(LC_ALL=C sort all_dbs.out | uniq -c) - <<EOF
1 dgraph.acl.rule
1 dgraph.drop.op
1 dgraph.graphql.schema
1 dgraph.graphql.xid
1 dgraph.password
Expand Down
40 changes: 40 additions & 0 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,28 @@ func parseSchemaFromAlterOperation(op *api.Operation) (*schema.ParsedSchema, err
return result, nil
}

// insertDropRecord is used to insert a helper record when a DROP operation is performed.
// This helper record lets us know during backup that a DROP operation was performed and that we
// need to write this information in backup manifest. So that while restoring from a backup series,
// we can create an exact replica of the system which existed at the time the last backup was taken.
// Note that if the server crashes after the DROP operation & before this helper record is inserted,
// then restoring from the incremental backup of such a DB would restore even the dropped
// data back.
func insertDropRecord(ctx context.Context, dropOp string) error {
_, err := (&Server{}).doQuery(context.WithValue(ctx, IsGraphql, true),
&api.Request{
Mutations: []*api.Mutation{{
Set: []*api.NQuad{{
Subject: "_:r",
Predicate: "dgraph.drop.op",
ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: dropOp}},
}},
}},
CommitNow: true,
}, NoAuthorize)
return err
}

// Alter handles requests to change the schema or remove parts or all of the data.
func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, error) {
ctx, span := otrace.StartSpan(ctx, "Server.Alter")
Expand Down Expand Up @@ -319,6 +341,12 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
return empty, err
}

// insert a helper record for backup & restore, indicating that drop_all was done
err = insertDropRecord(ctx, "DROP_ALL;")
if err != nil {
return empty, err
}

// insert empty GraphQL schema, so all alphas get notified to
// reset their in-memory GraphQL schema
_, err = UpdateGQLSchema(ctx, "", "")
Expand All @@ -344,6 +372,12 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
return empty, err
}

// insert a helper record for backup & restore, indicating that drop_data was done
err = insertDropRecord(ctx, "DROP_DATA;")
if err != nil {
return empty, err
}

// just reinsert the GraphQL schema, no need to alter dgraph schema as this was drop_data
_, err = UpdateGQLSchema(ctx, graphQLSchema, "")
// recreate the admin account after a drop data operation
Expand Down Expand Up @@ -382,6 +416,12 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
edges := []*pb.DirectedEdge{edge}
m.Edges = edges
_, err = query.ApplyMutations(ctx, m)
if err != nil {
return empty, err
}

// insert a helper record for backup & restore, indicating that drop_attr was done
err = insertDropRecord(ctx, "DROP_ATTR;"+attr)
return empty, err
}

Expand Down
7 changes: 5 additions & 2 deletions ee/acl/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,6 @@ func TestExpandQueryWithACLPermissions(t *testing.T) {
testutil.CompareJSON(t, `{"me":[{"name":"RandomGuy","age":23, "nickname":"RG"},{"name":"RandomGuy2","age":25, "nickname":"RG2"}]}`,
string(resp.GetJson()))


userClient, err := testutil.DgraphClient(testutil.SockAddr)
require.NoError(t, err)
time.Sleep(6 * time.Second)
Expand All @@ -1202,7 +1201,7 @@ func TestExpandQueryWithACLPermissions(t *testing.T) {
// Query via user when user has no permissions
resp, err = userClient.NewReadOnlyTxn().Query(ctx, query)
require.NoError(t, err, "Error while querying data")
testutil.CompareJSON(t, `{}`,string(resp.GetJson()))
testutil.CompareJSON(t, `{}`, string(resp.GetJson()))

// Login to groot to modify accesses (1)
accessJwt, _, err = testutil.HttpLogin(&testutil.LoginParams{
Expand Down Expand Up @@ -2081,6 +2080,10 @@ func TestSchemaQueryWithACL(t *testing.T) {
"type": "uid",
"list": true
},
{
"predicate": "dgraph.drop.op",
"type": "string"
},
{
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down
16 changes: 16 additions & 0 deletions graphql/e2e/common/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ const (
// successfully connected
initSchema = `{
"schema": [
{
"predicate": "dgraph.drop.op",
"type": "string"
},
{
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down Expand Up @@ -88,6 +92,10 @@ const (
"predicate": "A.b",
"type": "string"
},
{
"predicate": "dgraph.drop.op",
"type": "string"
},
{
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down Expand Up @@ -158,6 +166,10 @@ const (
"predicate": "A.c",
"type": "int"
},
{
"predicate": "dgraph.drop.op",
"type": "string"
},
{
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down Expand Up @@ -239,6 +251,10 @@ const (
"predicate": "A.d",
"type": "float"
},
{
"predicate": "dgraph.drop.op",
"type": "string"
},
{
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down
4 changes: 4 additions & 0 deletions graphql/e2e/directives/schema_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@
"predicate": "credits",
"type": "float"
},
{
"predicate": "dgraph.drop.op",
"type": "string"
},
{
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down
4 changes: 4 additions & 0 deletions graphql/e2e/normal/schema_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@
"predicate": "User.password",
"type": "password"
},
{
"predicate": "dgraph.drop.op",
"type": "string"
},
{
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down
4 changes: 4 additions & 0 deletions graphql/e2e/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ func TestConcurrentSchemaUpdates(t *testing.T) {
finalDgraphSchema := fmt.Sprintf(`{
"schema": [
%s,
{
"predicate":"dgraph.drop.op",
"type":"string"
},
{
"predicate": "dgraph.graphql.schema",
"type": "string"
Expand Down
3 changes: 2 additions & 1 deletion posting/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package posting

import (
"fmt"
"io/ioutil"
"math"
"os"
Expand All @@ -35,7 +36,7 @@ func BenchmarkWriter(b *testing.B) {
createKVList := func() bpb.KVList {
var KVList bpb.KVList
for i := 0; i < 5000000; i++ {
n := &bpb.KV{Key: []byte(string(i)), Value: val, Version: 5}
n := &bpb.KV{Key: []byte(fmt.Sprint(i)), Value: val, Version: 5}
KVList.Kv = append(KVList.Kv, n)
}
return KVList
Expand Down
17 changes: 16 additions & 1 deletion protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ service Worker {
rpc StreamSnapshot (stream Snapshot) returns (stream KVS) {}
rpc Sort (SortMessage) returns (SortResult) {}
rpc Schema (SchemaRequest) returns (SchemaResult) {}
rpc Backup (BackupRequest) returns (Status) {}
rpc Backup (BackupRequest) returns (BackupResponse) {}
rpc Restore (RestoreRequest) returns (Status) {}
rpc Export (ExportRequest) returns (Status) {}
rpc ReceivePredicate(stream KVS) returns (api.Payload) {}
Expand Down Expand Up @@ -605,6 +605,21 @@ message BackupRequest {
repeated string predicates = 10;
}

message BackupResponse {
repeated DropOperation drop_operations = 1;
}

message DropOperation {
enum DropOp {
ALL = 0;
DATA = 1;
ATTR = 2;
}
DropOp drop_op = 1;
// When drop_op is ATTR, drop_value will be the name of the ATTR; empty otherwise.
string drop_value = 2;
}

message ExportRequest {
uint32 group_id = 1; // Group id to back up.
uint64 read_ts = 2;
Expand Down
Loading

0 comments on commit c12e0b9

Please sign in to comment.