From 2d36978403949676eaa02fe63293c3125447cfb2 Mon Sep 17 00:00:00 2001 From: Naman Jain Date: Wed, 17 Mar 2021 17:00:57 +0530 Subject: [PATCH] chore(upgrade): update the upgrade tool for CORS (depreciated predicates) change (#7486) Some of the internal predicates have been removed in #7431 and #7451. When restoring from a backup taken on a version older than 21.03, we should skip restoring them. Also, its export will fail in data loading(bulk/live load). Instead of polluting the backup code with the restore fixation logic, we will use the dgraph upgrade tool to do the migration. When upgrading from version 20.11 to 21.03, the changes of this PR will be applied. It will update the graphql schema with CORS information and drop the depreciated predicates/types. It will also update the persisted query. This PR also fixes the restored backup when a --upgrade is set to true in the export_backup command. Note that persisted query is not fixed in export_backup. --- ee/backup/run.go | 29 ++- upgrade/change_list.go | 23 ++ upgrade/change_v20.03.0.go | 12 +- upgrade/change_v20.07.0.go | 12 +- upgrade/change_v21.03.0.go | 453 ++++++++++++++++++++++++++++++++++++ upgrade/upgrade.go | 19 +- upgrade/utils.go | 148 ++++++++++-- worker/online_restore_ee.go | 6 +- worker/restore.go | 15 +- worker/s3_handler.go | 8 +- 10 files changed, 671 insertions(+), 54 deletions(-) create mode 100644 upgrade/change_v21.03.0.go diff --git a/ee/backup/run.go b/ee/backup/run.go index f1fcd8f2157..12ae12b10d0 100644 --- a/ee/backup/run.go +++ b/ee/backup/run.go @@ -17,21 +17,23 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math" "os" "path/filepath" "strconv" "strings" "time" - "golang.org/x/sync/errgroup" - + "github.com/dgraph-io/badger/v3" "github.com/dgraph-io/badger/v3/options" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/credentials" "github.com/dgraph-io/dgraph/ee" "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/upgrade" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/ristretto/z" @@ -59,6 +61,7 @@ var opt struct { destination string format string verbose bool + upgrade bool // used by export backup command. } func init() { @@ -324,6 +327,7 @@ func initExportBackup() { Annotations: map[string]string{"group": "tool"}, } + ExportBackup.Cmd.SetHelpTemplate(x.NonRootTemplate) flag := ExportBackup.Cmd.Flags() flag.StringVarP(&opt.location, "location", "l", "", `Sets the location of the backup. Both file URIs and s3 are supported. @@ -332,6 +336,10 @@ func initExportBackup() { "The folder to which export the backups.") flag.StringVarP(&opt.format, "format", "f", "rdf", "The format of the export output. Accepts a value of either rdf or json") + flag.BoolVar(&opt.upgrade, "upgrade", false, + `If true, retrieve the CORS from DB and append at the end of GraphQL schema. + It also deletes the deprecated types and predicates. + Use this option when exporting a backup of 20.11 for loading onto 21.03.`) enc.RegisterFlags(flag) } @@ -376,6 +384,23 @@ func runExportBackup() error { "inside DB at %s: %v", dir, err) continue } + if opt.upgrade && gid == 1 { + // Query the cors in badger db and append it at the end of GraphQL schema. + // This change was introduced in v21.03. Backups with 20.07 <= version < 21.03 + // should apply this. + db, err := badger.OpenManaged(badger.DefaultOptions(dir). + WithNumVersionsToKeep(math.MaxInt32). + WithEncryptionKey(opt.key)) + if err != nil { + return err + } + if err := upgrade.OfflineUpgradeFrom2011To2103(db); err != nil { + return errors.Wrapf(err, "while fixing cors") + } + if err := db.Close(); err != nil { + return err + } + } eg.Go(func() error { return worker.StoreExport(&pb.ExportRequest{ GroupId: uint32(gid), diff --git a/upgrade/change_list.go b/upgrade/change_list.go index 6073660576c..0d6a0b6b790 100644 --- a/upgrade/change_list.go +++ b/upgrade/change_list.go @@ -52,5 +52,28 @@ func init() { // can't handle every scenario. So, it is best to let the user do it. }, }, + { + introducedIn: &version{major: 21, minor: 3, patch: 0}, + changes: []*change{ + { + name: "Upgrade Persistent Query", + description: "This updates the persisted query from old format to new format." + + "Persistent query had 2 predicates which have been merged into a single " + + "predicate dgraph.graphql.p_query. " + + "For more info, see: https://github.com/dgraph-io/dgraph/pull/7451", + minFromVersion: &version{major: 20, minor: 11, patch: 0}, + applyFunc: upgradePersitentQuery, + }, + { + name: "Upgrade CORS", + description: "This updates GraphQL schema to contain the CORS information. " + + "Some of the dgraph internal predicates are removed in v21.03.0. " + + "dgraph.cors that used to store CORS information is one of them. " + + "For more info, see: https://github.com/dgraph-io/dgraph/pull/7431", + minFromVersion: &version{major: 20, minor: 11, patch: 0}, + applyFunc: upgradeCORS, + }, + }, + }, } } diff --git a/upgrade/change_v20.03.0.go b/upgrade/change_v20.03.0.go index eae287824da..82f39ac20e1 100644 --- a/upgrade/change_v20.03.0.go +++ b/upgrade/change_v20.03.0.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/dgraph-io/dgo/v200/protos/api" + "github.com/dgraph-io/dgraph/x" ) const ( @@ -47,14 +48,11 @@ type rule struct { type rules []rule func upgradeACLRules() error { - dg, conn, err := getDgoClient(true) - if err != nil { - return fmt.Errorf("error getting dgo client: %w", err) - } - defer conn.Close() + dg, cb := x.GetDgraphClient(Upgrade.Conf, true) + defer cb() data := make(map[string][]group) - if err = getQueryResult(dg, queryACLGroupsBefore_v20_03_0, &data); err != nil { + if err := getQueryResult(dg, queryACLGroupsBefore_v20_03_0, &data); err != nil { return fmt.Errorf("error querying old ACL rules: %w", err) } @@ -118,7 +116,7 @@ func upgradeACLRules() error { deleteOld := Upgrade.Conf.GetBool("deleteOld") if deleteOld { - err = alterWithClient(dg, &api.Operation{ + err := alterWithClient(dg, &api.Operation{ DropOp: api.Operation_ATTR, DropValue: "dgraph.group.acl", }) diff --git a/upgrade/change_v20.07.0.go b/upgrade/change_v20.07.0.go index a44d3ae0dd5..f468673f90c 100644 --- a/upgrade/change_v20.07.0.go +++ b/upgrade/change_v20.07.0.go @@ -22,6 +22,7 @@ import ( "github.com/dgraph-io/dgo/v200" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/x" ) const ( @@ -128,19 +129,16 @@ func upgradeAclTypeNames() error { } // get dgo client - dg, conn, err := getDgoClient(true) - if err != nil { - return fmt.Errorf("error getting dgo client: %w", err) - } - defer conn.Close() + dg, cb := x.GetDgraphClient(Upgrade.Conf, true) + defer cb() // apply upgrades for old ACL type names, one by one. for _, typeNameInfo := range aclTypeNameInfo { - if err = typeNameInfo.updateTypeName(dg); err != nil { + if err := typeNameInfo.updateTypeName(dg); err != nil { return fmt.Errorf("error upgrading ACL type name from `%s` to `%s`: %w", typeNameInfo.oldTypeName, typeNameInfo.newTypeName, err) } - if err = typeNameInfo.updateTypeSchema(dg); err != nil { + if err := typeNameInfo.updateTypeSchema(dg); err != nil { return fmt.Errorf("error upgrading schema for old ACL type `%s`: %w", typeNameInfo.oldTypeName, err) } diff --git a/upgrade/change_v21.03.0.go b/upgrade/change_v21.03.0.go new file mode 100644 index 00000000000..e3e81341518 --- /dev/null +++ b/upgrade/change_v21.03.0.go @@ -0,0 +1,453 @@ +/* + * Copyright 2021 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package upgrade + +import ( + "fmt" + "math" + "net/http" + "strconv" + + "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/dgo/v200" + "github.com/dgraph-io/dgo/v200/protos/api" + "github.com/dgraph-io/dgraph/graphql/schema" + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/x" + "github.com/golang/glog" + "github.com/pkg/errors" +) + +const ( + queryCORS_v21_03_0 = `{ + cors(func: has(dgraph.cors)){ + uid + dgraph.cors + } + } + ` + querySchema_v21_03_0 = `{ + schema(func: has(dgraph.graphql.schema)){ + uid + dgraph.graphql.schema + } + } + ` + queryPersistedQuery_v21_03_0 = `{ + pquery(func: type(dgraph.graphql.persisted_query)) { + uid + dgraph.graphql.p_query + dgraph.graphql.p_sha256hash + } + } + ` +) + +type cors struct { + UID string `json:"uid"` + Cors []string `json:"dgraph.cors,omitempty"` +} + +type sch struct { + UID string `json:"uid"` + GQLSchema string `json:"dgraph.graphql.schema"` +} + +type pquery struct { + UID string `json:"uid"` + Query string `json:"dgraph.graphql.p_query,omitempty"` + SHA string `json:"dgraph.graphql.p_sha256hash,omitempty"` +} + +func updateGQLSchema(jwt *api.Jwt, gqlSchema string, corsList []string) error { + if len(gqlSchema) == 0 || len(corsList) == 0 { + fmt.Println("Nothing to update in GraphQL shchema. Either schema or cors not found.") + return nil + } + gqlSchema += "\n\n\n# Below schema elements will only work for dgraph" + + " versions >= 21.03. In older versions it will be ignored." + for _, c := range corsList { + gqlSchema += fmt.Sprintf("\n# Dgraph.Allow-Origin \"%s\"", c) + } + + // Update the schema. + header := http.Header{} + header.Set("X-Dgraph-AccessToken", jwt.AccessJwt) + header.Set("X-Dgraph-AuthToken", Upgrade.Conf.GetString(authToken)) + updateSchemaParams := &GraphQLParams{ + Query: `mutation updateGQLSchema($sch: String!) { + updateGQLSchema(input: { set: { schema: $sch }}) { + gqlSchema { + id + } + } + }`, + Variables: map[string]interface{}{"sch": gqlSchema}, + Headers: header, + } + + adminUrl := Upgrade.Conf.GetString(alphaHttp) + "/admin" + resp, err := makeGqlRequest(updateSchemaParams, adminUrl) + if err != nil { + return err + } + if len(resp.Errors) > 0 { + return errors.Errorf("Error while updating the schema %s\n", resp.Errors.Error()) + } + fmt.Println("Successfully updated the GraphQL schema.") + return nil +} + +var deprecatedPreds = map[string]struct{}{ + "dgraph.cors": {}, + "dgraph.graphql.schema_created_at": {}, + "dgraph.graphql.schema_history": {}, + "dgraph.graphql.p_sha256hash": {}, +} + +var deprecatedTypes = map[string]struct{}{ + "dgraph.type.cors": {}, + "dgraph.graphql.history": {}, +} + +func dropDeprecated(dg *dgo.Dgraph) error { + if !Upgrade.Conf.GetBool(deleteOld) { + return nil + } + for pred := range deprecatedPreds { + op := &api.Operation{ + DropOp: api.Operation_ATTR, + DropValue: pred, + } + if err := alterWithClient(dg, op); err != nil { + return errors.Wrapf(err, "error deleting old predicate %s", pred) + } + } + for typ := range deprecatedTypes { + op := &api.Operation{ + DropOp: api.Operation_TYPE, + DropValue: typ, + } + if err := alterWithClient(dg, op); err != nil { + return errors.Wrapf(err, "error deleting old type %s", typ) + } + } + fmt.Println("Successfully dropped the deprecated predicates") + return nil +} + +func upgradePersitentQuery() error { + dg, cb := x.GetDgraphClient(Upgrade.Conf, true) + defer cb() + + jwt, err := getAccessJwt() + if err != nil { + return errors.Wrap(err, "while getting jwt auth token") + } + + // Get persisted queries. + queryData := make(map[string][]pquery) + if err := getQueryResult(dg, queryPersistedQuery_v21_03_0, &queryData); err != nil { + return errors.Wrap(err, "error querying persisted queries") + } + + // Update the schema with new indexer for persisted query. + updatedSchema := ` + : string @index(sha256) . + type { + + } + ` + if err := alterWithClient(dg, &api.Operation{Schema: updatedSchema}); err != nil { + return fmt.Errorf("error updating the schema for persistent query, %w", err) + } + + // Reinsert these queries. Note that upsert won't work here as 'dgraph.graphql.p_query' is + // graphql reserved type. + header := http.Header{} + header.Set("X-Dgraph-AccessToken", jwt.AccessJwt) + header.Set("X-Dgraph-AuthToken", Upgrade.Conf.GetString(authToken)) + graphqlUrl := Upgrade.Conf.GetString(alphaHttp) + "/graphql" + for _, pquery := range queryData["pquery"] { + updateSchemaParams := &GraphQLParams{ + Query: pquery.Query, + Extensions: &schema.RequestExtensions{PersistedQuery: schema.PersistedQuery{ + Sha256Hash: pquery.SHA, + }}, + Headers: header, + } + + resp, err := makeGqlRequest(updateSchemaParams, graphqlUrl) + if err != nil { + return err + } + if len(resp.Errors) > 0 { + return errors.Errorf("Error while updating the schema %s\n", resp.Errors.Error()) + } + } + + return nil +} + +func upgradeCORS() error { + dg, cb := x.GetDgraphClient(Upgrade.Conf, true) + defer cb() + + jwt, err := getAccessJwt() + if err != nil { + return errors.Wrap(err, "while getting jwt auth token") + } + + // Get CORS. + corsData := make(map[string][]cors) + if err = getQueryResult(dg, queryCORS_v21_03_0, &corsData); err != nil { + return errors.Wrap(err, "error querying cors") + } + + var corsList []string + var maxUid uint64 + for _, cors := range corsData["cors"] { + uid, err := strconv.ParseUint(cors.UID, 0, 64) + if err != nil { + return err + } + if uid > maxUid { + maxUid = uid + if len(cors.Cors) == 1 && cors.Cors[0] == "*" { + // No need to update the GraphQL schema if all origins are allowed. + corsList = corsList[:0] + continue + } + corsList = cors.Cors + } + } + + // Get GraphQL schema. + schemaData := make(map[string][]sch) + if err = getQueryResult(dg, querySchema_v21_03_0, &schemaData); err != nil { + return errors.Wrap(err, "error querying graphql schema") + } + + var gqlSchema string + maxUid = 0 + for _, schema := range schemaData["schema"] { + uid, err := strconv.ParseUint(schema.UID, 0, 64) + if err != nil { + return err + } + if uid > maxUid { + maxUid = uid + gqlSchema = schema.GQLSchema + } + } + + // Update the GraphQL schema. + if err := updateGQLSchema(jwt, gqlSchema, corsList); err != nil { + return err + } + + // Drop all the deprecated predicates and types. + return dropDeprecated(dg) +} + +////////////////////////////////////// +// BELOW CODE IS FOR OFFLINE UPGRADE. +////////////////////////////////////// + +func getData(db *badger.DB, attr string, fn func(item *badger.Item) error) error { + return db.View(func(txn *badger.Txn) error { + attr = x.GalaxyAttr(attr) + initKey := x.ParsedKey{ + Attr: attr, + } + prefix := initKey.DataPrefix() + startKey := append(x.DataKey(attr, math.MaxUint64)) + + itOpt := badger.DefaultIteratorOptions + itOpt.AllVersions = true + itOpt.Reverse = true + itOpt.Prefix = prefix + itr := txn.NewIterator(itOpt) + defer itr.Close() + for itr.Seek(startKey); itr.Valid(); itr.Next() { + item := itr.Item() + // We expect only complete posting list. + x.AssertTrue(item.UserMeta() == posting.BitCompletePosting) + if err := fn(item); err != nil { + return err + } + break + } + return nil + }) +} + +func updateGQLSchemaOffline(db *badger.DB, cors [][]byte) error { + entry := &badger.Entry{} + var version uint64 + err := getData(db, "dgraph.graphql.schema", func(item *badger.Item) error { + var err error + entry.Key = item.KeyCopy(nil) + entry.Value, err = item.ValueCopy(nil) + if err != nil { + return err + } + entry.UserMeta = item.UserMeta() + entry.ExpiresAt = item.ExpiresAt() + version = item.Version() + return nil + }) + if err != nil { + return err + } + if entry.Key == nil { + return nil + } + + var corsBytes []byte + corsBytes = append(corsBytes, []byte("\n\n\n# Below schema elements will only work for dgraph"+ + " versions >= 21.03. In older versions it will be ignored.")...) + for _, val := range cors { + corsBytes = append(corsBytes, []byte(fmt.Sprintf("\n# Dgraph.Allow-Origin \"%s\"", + string(val)))...) + } + + // Append the cors at the end of GraphQL schema. + pl := pb.PostingList{} + if err = pl.Unmarshal(entry.Value); err != nil { + return err + } + pl.Postings[0].Value = append(pl.Postings[0].Value, corsBytes...) + entry.Value, err = pl.Marshal() + if err != nil { + return err + } + + txn := db.NewTransactionAt(math.MaxUint64, true) + defer txn.Discard() + if err = txn.SetEntry(entry); err != nil { + return err + } + return txn.CommitAt(version, nil) +} + +func getCors(db *badger.DB) ([][]byte, error) { + var corsVals [][]byte + err := getData(db, "dgraph.cors", func(item *badger.Item) error { + val, err := item.ValueCopy(nil) + if err != nil { + return err + } + pl := pb.PostingList{} + if err := pl.Unmarshal(val); err != nil { + return err + } + for _, p := range pl.Postings { + corsVals = append(corsVals, p.Value) + } + return nil + }) + return corsVals, err +} + +func dropDepreciated(db *badger.DB) error { + var prefixes [][]byte + for pred := range deprecatedPreds { + pred = x.GalaxyAttr(pred) + prefixes = append(prefixes, x.SchemaKey(pred), x.PredicatePrefix(pred)) + } + for typ := range deprecatedTypes { + prefixes = append(prefixes, x.TypeKey(x.GalaxyAttr(typ))) + } + return db.DropPrefix(prefixes...) +} + +// fixCors removes the internal predicates from the restored backup. This also appends CORS from +// dgraph.cors to GraphQL schema. +func fixCors(db *badger.DB) error { + glog.Infof("Fixing cors information in the restored backup.") + cors, err := getCors(db) + if err != nil { + return errors.Wrapf(err, "Error while getting cors from db.") + } + if err := updateGQLSchemaOffline(db, cors); err != nil { + return errors.Wrapf(err, "Error while updating GraphQL schema.") + } + return errors.Wrapf(dropDepreciated(db), "Error while dropping depreciated preds/types.") +} + +// fixPersistedQuery, for the schema related to persisted query removes the deprecated field from +// the type and updates the index tokenizer for the predicate. +func fixPersistedQuery(db *badger.DB) error { + glog.Infof("Fixing persisted query schema in restored backup.") + update := func(entry *badger.Entry) error { + txn := db.NewTransactionAt(math.MaxUint64, true) + defer txn.Discard() + if err := txn.SetEntry(entry); err != nil { + return err + } + // Schema is written at version 1. + return txn.CommitAt(1, nil) + } + + // Update the tokenizer in the schema. + su := pb.SchemaUpdate{ + Predicate: x.GalaxyAttr("dgraph.graphql.p_query"), + ValueType: pb.Posting_STRING, + Directive: pb.SchemaUpdate_INDEX, + Tokenizer: []string{"sha256"}, + } + data, err := su.Marshal() + if err != nil { + return err + } + entry := &badger.Entry{} + entry.Key = x.SchemaKey(x.GalaxyAttr("dgraph.graphql.p_query")) + entry.Value = data + entry.UserMeta = posting.BitSchemaPosting + if err := update(entry); err != nil { + return errors.Wrap(err, "while updating persisted query's schema") + } + + // Update the type. + tu := pb.TypeUpdate{ + TypeName: x.GalaxyAttr("dgraph.graphql.persisted_query"), + Fields: []*pb.SchemaUpdate{&su}, + } + data, err = tu.Marshal() + if err != nil { + return err + } + entry = &badger.Entry{} + entry.Key = x.TypeKey(x.GalaxyAttr("dgraph.graphql.persisted_query")) + entry.Value = data + entry.UserMeta = posting.BitSchemaPosting + if err := update(entry); err != nil { + return errors.Wrap(err, "while updating persisted query's type") + } + return nil +} + +// OfflineUpgradeFrom2011To2103 upgrades a p directory restored from backup of 20.11 to the changes +// in 21.03. It fixes the cors, schema and drops the deprecated types/predicates. +func OfflineUpgradeFrom2011To2103(db *badger.DB) error { + if err := fixPersistedQuery(db); err != nil { + return errors.Wrapf(err, "while upgrading persisted query") + } + return errors.Wrapf(fixCors(db), "while upgrading cors") +} diff --git a/upgrade/upgrade.go b/upgrade/upgrade.go index a2e8093ec06..f20a1b9b3e6 100644 --- a/upgrade/upgrade.go +++ b/upgrade/upgrade.go @@ -44,6 +44,9 @@ const ( acl = "acl" dryRun = "dry-run" alpha = "alpha" + slashGrpc = "slash_grpc_endpoint" + authToken = "auth_token" + alphaHttp = "alpha-http" user = "user" password = "password" deleteOld = "deleteOld" @@ -152,12 +155,23 @@ func init() { flag := Upgrade.Cmd.Flags() flag.Bool(acl, false, "upgrade ACL from v1.2.2 to >=v20.03.0") flag.Bool(dryRun, false, "dry-run the upgrade") - flag.StringP(alpha, "a", "127.0.0.1:9080", "Dgraph Alpha gRPC server address") + flag.StringP(alpha, "a", "127.0.0.1:9080", + "Comma separated list of Dgraph Alpha gRPC server address") + flag.String(slashGrpc, "", "Path to Slash GraphQL GRPC endpoint. "+ + "If --slash_grpc_endpoint is set, all other TLS options and connection options will be "+ + "ignored") + flag.String(authToken, "", + "The auth token passed to the server for Alter operation of the schema file. "+ + "If used with --slash_grpc_endpoint, then this should be set to the API token issued"+ + "by Slash GraphQL") + flag.String(alphaHttp, "http://127.0.0.1:8080", "Draph Alpha HTTP(S) endpoint.") flag.StringP(user, "u", "", "Username of ACL user") flag.StringP(password, "p", "", "Password of ACL user") flag.BoolP(deleteOld, "d", true, "Delete the older ACL types/predicates") flag.StringP(from, "f", "", "The version string from which to upgrade, e.g.: v1.2.2") flag.StringP(to, "t", "", "The version string till which to upgrade, e.g.: v20.03.0") + + x.RegisterClientTLSFlags(flag) } func run() { @@ -167,6 +181,9 @@ func run() { return } + // Login using dgo client fetches the information from creds flag. + Upgrade.Conf.Set("creds", fmt.Sprintf("user=%s; password=%s; namespace=%d", + Upgrade.Conf.GetString(user), Upgrade.Conf.GetString(password), x.GalaxyNamespace)) applyChangeList(cmdInput, allChanges) } diff --git a/upgrade/utils.go b/upgrade/utils.go index ceed71d1271..d576fa68ad5 100644 --- a/upgrade/utils.go +++ b/upgrade/utils.go @@ -17,45 +17,151 @@ package upgrade import ( + "bytes" "context" "encoding/json" "fmt" + "io/ioutil" + "net/http" "strings" "time" "github.com/dgraph-io/dgo/v200" "github.com/dgraph-io/dgo/v200/protos/api" + "github.com/dgraph-io/dgraph/graphql/schema" "github.com/dgraph-io/dgraph/x" - "google.golang.org/grpc" + "github.com/pkg/errors" ) -// getDgoClient creates a gRPC connection and uses that to create a new dgo client. -// The gRPC.ClientConn returned by this must be closed after use. -func getDgoClient(withLogin bool) (*dgo.Dgraph, *grpc.ClientConn, error) { - alpha := Upgrade.Conf.GetString(alpha) +// getAccessJwt gets the access jwt token from by logging into the cluster. +func getAccessJwt() (*api.Jwt, error) { + user := Upgrade.Conf.GetString(user) + password := Upgrade.Conf.GetString(password) + header := http.Header{} + header.Set("X-Dgraph-AuthToken", Upgrade.Conf.GetString(authToken)) + updateSchemaParams := &GraphQLParams{ + Query: `mutation login($userId: String, $password: String, $namespace: Int) { + login(userId: $userId, password: $password, namespace: $namespace) { + response { + accessJWT + } + } + }`, + Variables: map[string]interface{}{"userId": user, "password": password, + "namespace": x.GalaxyNamespace}, + Headers: header, + } - // TODO(Aman): add TLS configuration. - conn, err := grpc.Dial(alpha, grpc.WithInsecure()) + adminUrl := Upgrade.Conf.GetString(alphaHttp) + "/admin" + resp, err := makeGqlRequest(updateSchemaParams, adminUrl) if err != nil { - return nil, nil, fmt.Errorf("unable to connect to Dgraph cluster: %w", err) + return nil, err + } + if len(resp.Errors) > 0 { + return nil, errors.Errorf("Error while updating the schema %s\n", resp.Errors.Error()) } - dg := dgo.NewDgraphClient(api.NewDgraphClient(conn)) - - if withLogin { - userName := Upgrade.Conf.GetString(user) - password := Upgrade.Conf.GetString(password) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - // login to cluster - //TODO(Ahsan): What should be the namespace here? - if err = dg.LoginIntoNamespace(ctx, userName, password, x.GalaxyNamespace); err != nil { - x.Check(conn.Close()) - return nil, nil, fmt.Errorf("unable to login to Dgraph cluster: %w", err) + type Response struct { + Login struct { + Response struct { + AccessJWT string + } } } + var r Response + if err := json.Unmarshal(resp.Data, &r); err != nil { + return nil, err + } + + jwt := &api.Jwt{AccessJwt: r.Login.Response.AccessJWT} + return jwt, nil +} + +type GraphQLParams struct { + Query string `json:"query"` + OperationName string `json:"operationName"` + Variables map[string]interface{} `json:"variables"` + Extensions *schema.RequestExtensions `json:"extensions,omitempty"` + Headers http.Header +} + +// GraphQLResponse GraphQL response structure. +// see https://graphql.github.io/graphql-spec/June2018/#sec-Response +type GraphQLResponse struct { + Data json.RawMessage `json:"data,omitempty"` + Errors x.GqlErrorList `json:"errors,omitempty"` + Extensions map[string]interface{} `json:"extensions,omitempty"` +} + +func makeGqlRequest(params *GraphQLParams, url string) (*GraphQLResponse, error) { + req, err := createGQLPost(params, url) + if err != nil { + return nil, err + } + for h := range params.Headers { + req.Header.Set(h, params.Headers.Get(h)) + } + res, err := runGQLRequest(req) + if err != nil { + return nil, err + } + + var result *GraphQLResponse + if err := json.Unmarshal(res, &result); err != nil { + return nil, err + } + + return result, nil +} + +func createGQLPost(params *GraphQLParams, url string) (*http.Request, error) { + body, err := json.Marshal(params) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(body)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + return req, nil +} + +// runGQLRequest runs a HTTP GraphQL request and returns the data or any errors. +func runGQLRequest(req *http.Request) ([]byte, error) { + config, err := x.LoadClientTLSConfig(Upgrade.Conf) + if err != nil { + return nil, err + } + tr := &http.Transport{TLSClientConfig: config} + client := &http.Client{Timeout: 50 * time.Second, Transport: tr} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + + // GraphQL server should always return OK, even when there are errors + if status := resp.StatusCode; status != http.StatusOK { + return nil, errors.Errorf("unexpected status code: %v", status) + } + + if strings.ToLower(resp.Header.Get("Content-Type")) != "application/json" { + return nil, errors.Errorf("unexpected content type: %v", resp.Header.Get("Content-Type")) + } + + if resp.Header.Get("Access-Control-Allow-Origin") != "*" { + return nil, errors.Errorf("cors headers weren't set in response") + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Errorf("unable to read response body: %v", err) + } - return dg, conn, nil + return body, nil } // getQueryResult executes the given query and unmarshals the result in given pointer queryResPtr. diff --git a/worker/online_restore_ee.go b/worker/online_restore_ee.go index f9099b37d6b..566c3c6fa10 100644 --- a/worker/online_restore_ee.go +++ b/worker/online_restore_ee.go @@ -355,7 +355,11 @@ func writeBackup(ctx context.Context, req *pb.RestoreRequest) error { } maxUid, maxNsId, err := loadFromBackup(pstore, &loadBackupInput{ - r: gzReader, restoreTs: req.RestoreTs, preds: in.preds, dropOperations: in.dropOperations, + r: gzReader, + restoreTs: req.RestoreTs, + preds: in.preds, + dropOperations: in.dropOperations, + isOld: in.isOld, }) if err != nil { return 0, 0, errors.Wrapf(err, "cannot write backup") diff --git a/worker/restore.go b/worker/restore.go index ee1e24c4741..a9b43349a5e 100644 --- a/worker/restore.go +++ b/worker/restore.go @@ -63,6 +63,10 @@ func RunRestore(pdir, location, backupId string, key x.SensitiveByteSlice, } return 0, 0, err } + + if !pathExist(dir) { + fmt.Println("Creating new db:", dir) + } // The badger DB should be opened only after creating the backup // file reader and verifying the encryption in the backup file. db, err := badger.OpenManaged(badger.DefaultOptions(dir). @@ -78,9 +82,6 @@ func RunRestore(pdir, location, backupId string, key x.SensitiveByteSlice, return 0, 0, err } defer db.Close() - if !pathExist(dir) { - fmt.Println("Creating new db:", dir) - } maxUid, maxNsId, err := loadFromBackup(db, &loadBackupInput{ r: gzReader, restoreTs: 0, @@ -174,12 +175,8 @@ func loadFromBackup(db *badger.DB, in *loadBackupInput) (uint64, uint64, error) } // Update the max uid and namespace id that has been seen while restoring this backup. - if parsedKey.Uid > maxUid { - maxUid = parsedKey.Uid - } - if namespace > maxNsId { - maxNsId = namespace - } + maxUid = x.Max(maxUid, parsedKey.Uid) + maxNsId = x.Max(maxNsId, namespace) // Override the version if requested. Should not be done for type and schema predicates, // which always have their version set to 1. diff --git a/worker/s3_handler.go b/worker/s3_handler.go index 79d7c5c81e6..e61cb646df5 100644 --- a/worker/s3_handler.go +++ b/worker/s3_handler.go @@ -250,12 +250,8 @@ func (h *s3Handler) Load(uri *url.URL, backupId string, backupNum uint64, fn loa if err != nil { return LoadResult{Err: err} } - if groupMaxUid > maxUid { - maxUid = groupMaxUid - } - if groupMaxNsId > maxNsId { - maxNsId = groupMaxNsId - } + maxUid = x.Max(maxUid, groupMaxUid) + maxNsId = x.Max(maxNsId, groupMaxNsId) } since = manifest.Since }