diff --git a/ee/backup/run.go b/ee/backup/run.go index f1fcd8f2157..12ae12b10d0 100644 --- a/ee/backup/run.go +++ b/ee/backup/run.go @@ -17,21 +17,23 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math" "os" "path/filepath" "strconv" "strings" "time" - "golang.org/x/sync/errgroup" - + "github.com/dgraph-io/badger/v3" "github.com/dgraph-io/badger/v3/options" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/credentials" "github.com/dgraph-io/dgraph/ee" "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/upgrade" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/ristretto/z" @@ -59,6 +61,7 @@ var opt struct { destination string format string verbose bool + upgrade bool // used by export backup command. } func init() { @@ -324,6 +327,7 @@ func initExportBackup() { Annotations: map[string]string{"group": "tool"}, } + ExportBackup.Cmd.SetHelpTemplate(x.NonRootTemplate) flag := ExportBackup.Cmd.Flags() flag.StringVarP(&opt.location, "location", "l", "", `Sets the location of the backup. Both file URIs and s3 are supported. @@ -332,6 +336,10 @@ func initExportBackup() { "The folder to which export the backups.") flag.StringVarP(&opt.format, "format", "f", "rdf", "The format of the export output. Accepts a value of either rdf or json") + flag.BoolVar(&opt.upgrade, "upgrade", false, + `If true, retrieve the CORS from DB and append at the end of GraphQL schema. + It also deletes the deprecated types and predicates. + Use this option when exporting a backup of 20.11 for loading onto 21.03.`) enc.RegisterFlags(flag) } @@ -376,6 +384,23 @@ func runExportBackup() error { "inside DB at %s: %v", dir, err) continue } + if opt.upgrade && gid == 1 { + // Query the cors in badger db and append it at the end of GraphQL schema. + // This change was introduced in v21.03. Backups with 20.07 <= version < 21.03 + // should apply this. + db, err := badger.OpenManaged(badger.DefaultOptions(dir). + WithNumVersionsToKeep(math.MaxInt32). + WithEncryptionKey(opt.key)) + if err != nil { + return err + } + if err := upgrade.OfflineUpgradeFrom2011To2103(db); err != nil { + return errors.Wrapf(err, "while fixing cors") + } + if err := db.Close(); err != nil { + return err + } + } eg.Go(func() error { return worker.StoreExport(&pb.ExportRequest{ GroupId: uint32(gid), diff --git a/upgrade/change_list.go b/upgrade/change_list.go index 6073660576c..0d6a0b6b790 100644 --- a/upgrade/change_list.go +++ b/upgrade/change_list.go @@ -52,5 +52,28 @@ func init() { // can't handle every scenario. So, it is best to let the user do it. }, }, + { + introducedIn: &version{major: 21, minor: 3, patch: 0}, + changes: []*change{ + { + name: "Upgrade Persistent Query", + description: "This updates the persisted query from old format to new format." + + "Persistent query had 2 predicates which have been merged into a single " + + "predicate dgraph.graphql.p_query. " + + "For more info, see: https://github.com/dgraph-io/dgraph/pull/7451", + minFromVersion: &version{major: 20, minor: 11, patch: 0}, + applyFunc: upgradePersitentQuery, + }, + { + name: "Upgrade CORS", + description: "This updates GraphQL schema to contain the CORS information. " + + "Some of the dgraph internal predicates are removed in v21.03.0. " + + "dgraph.cors that used to store CORS information is one of them. " + + "For more info, see: https://github.com/dgraph-io/dgraph/pull/7431", + minFromVersion: &version{major: 20, minor: 11, patch: 0}, + applyFunc: upgradeCORS, + }, + }, + }, } } diff --git a/upgrade/change_v20.03.0.go b/upgrade/change_v20.03.0.go index eae287824da..82f39ac20e1 100644 --- a/upgrade/change_v20.03.0.go +++ b/upgrade/change_v20.03.0.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/dgraph-io/dgo/v200/protos/api" + "github.com/dgraph-io/dgraph/x" ) const ( @@ -47,14 +48,11 @@ type rule struct { type rules []rule func upgradeACLRules() error { - dg, conn, err := getDgoClient(true) - if err != nil { - return fmt.Errorf("error getting dgo client: %w", err) - } - defer conn.Close() + dg, cb := x.GetDgraphClient(Upgrade.Conf, true) + defer cb() data := make(map[string][]group) - if err = getQueryResult(dg, queryACLGroupsBefore_v20_03_0, &data); err != nil { + if err := getQueryResult(dg, queryACLGroupsBefore_v20_03_0, &data); err != nil { return fmt.Errorf("error querying old ACL rules: %w", err) } @@ -118,7 +116,7 @@ func upgradeACLRules() error { deleteOld := Upgrade.Conf.GetBool("deleteOld") if deleteOld { - err = alterWithClient(dg, &api.Operation{ + err := alterWithClient(dg, &api.Operation{ DropOp: api.Operation_ATTR, DropValue: "dgraph.group.acl", }) diff --git a/upgrade/change_v20.07.0.go b/upgrade/change_v20.07.0.go index a44d3ae0dd5..f468673f90c 100644 --- a/upgrade/change_v20.07.0.go +++ b/upgrade/change_v20.07.0.go @@ -22,6 +22,7 @@ import ( "github.com/dgraph-io/dgo/v200" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/x" ) const ( @@ -128,19 +129,16 @@ func upgradeAclTypeNames() error { } // get dgo client - dg, conn, err := getDgoClient(true) - if err != nil { - return fmt.Errorf("error getting dgo client: %w", err) - } - defer conn.Close() + dg, cb := x.GetDgraphClient(Upgrade.Conf, true) + defer cb() // apply upgrades for old ACL type names, one by one. for _, typeNameInfo := range aclTypeNameInfo { - if err = typeNameInfo.updateTypeName(dg); err != nil { + if err := typeNameInfo.updateTypeName(dg); err != nil { return fmt.Errorf("error upgrading ACL type name from `%s` to `%s`: %w", typeNameInfo.oldTypeName, typeNameInfo.newTypeName, err) } - if err = typeNameInfo.updateTypeSchema(dg); err != nil { + if err := typeNameInfo.updateTypeSchema(dg); err != nil { return fmt.Errorf("error upgrading schema for old ACL type `%s`: %w", typeNameInfo.oldTypeName, err) } diff --git a/upgrade/change_v21.03.0.go b/upgrade/change_v21.03.0.go new file mode 100644 index 00000000000..e3e81341518 --- /dev/null +++ b/upgrade/change_v21.03.0.go @@ -0,0 +1,453 @@ +/* + * Copyright 2021 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package upgrade + +import ( + "fmt" + "math" + "net/http" + "strconv" + + "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/dgo/v200" + "github.com/dgraph-io/dgo/v200/protos/api" + "github.com/dgraph-io/dgraph/graphql/schema" + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/x" + "github.com/golang/glog" + "github.com/pkg/errors" +) + +const ( + queryCORS_v21_03_0 = `{ + cors(func: has(dgraph.cors)){ + uid + dgraph.cors + } + } + ` + querySchema_v21_03_0 = `{ + schema(func: has(dgraph.graphql.schema)){ + uid + dgraph.graphql.schema + } + } + ` + queryPersistedQuery_v21_03_0 = `{ + pquery(func: type(dgraph.graphql.persisted_query)) { + uid + dgraph.graphql.p_query + dgraph.graphql.p_sha256hash + } + } + ` +) + +type cors struct { + UID string `json:"uid"` + Cors []string `json:"dgraph.cors,omitempty"` +} + +type sch struct { + UID string `json:"uid"` + GQLSchema string `json:"dgraph.graphql.schema"` +} + +type pquery struct { + UID string `json:"uid"` + Query string `json:"dgraph.graphql.p_query,omitempty"` + SHA string `json:"dgraph.graphql.p_sha256hash,omitempty"` +} + +func updateGQLSchema(jwt *api.Jwt, gqlSchema string, corsList []string) error { + if len(gqlSchema) == 0 || len(corsList) == 0 { + fmt.Println("Nothing to update in GraphQL shchema. Either schema or cors not found.") + return nil + } + gqlSchema += "\n\n\n# Below schema elements will only work for dgraph" + + " versions >= 21.03. In older versions it will be ignored." + for _, c := range corsList { + gqlSchema += fmt.Sprintf("\n# Dgraph.Allow-Origin \"%s\"", c) + } + + // Update the schema. + header := http.Header{} + header.Set("X-Dgraph-AccessToken", jwt.AccessJwt) + header.Set("X-Dgraph-AuthToken", Upgrade.Conf.GetString(authToken)) + updateSchemaParams := &GraphQLParams{ + Query: `mutation updateGQLSchema($sch: String!) { + updateGQLSchema(input: { set: { schema: $sch }}) { + gqlSchema { + id + } + } + }`, + Variables: map[string]interface{}{"sch": gqlSchema}, + Headers: header, + } + + adminUrl := Upgrade.Conf.GetString(alphaHttp) + "/admin" + resp, err := makeGqlRequest(updateSchemaParams, adminUrl) + if err != nil { + return err + } + if len(resp.Errors) > 0 { + return errors.Errorf("Error while updating the schema %s\n", resp.Errors.Error()) + } + fmt.Println("Successfully updated the GraphQL schema.") + return nil +} + +var deprecatedPreds = map[string]struct{}{ + "dgraph.cors": {}, + "dgraph.graphql.schema_created_at": {}, + "dgraph.graphql.schema_history": {}, + "dgraph.graphql.p_sha256hash": {}, +} + +var deprecatedTypes = map[string]struct{}{ + "dgraph.type.cors": {}, + "dgraph.graphql.history": {}, +} + +func dropDeprecated(dg *dgo.Dgraph) error { + if !Upgrade.Conf.GetBool(deleteOld) { + return nil + } + for pred := range deprecatedPreds { + op := &api.Operation{ + DropOp: api.Operation_ATTR, + DropValue: pred, + } + if err := alterWithClient(dg, op); err != nil { + return errors.Wrapf(err, "error deleting old predicate %s", pred) + } + } + for typ := range deprecatedTypes { + op := &api.Operation{ + DropOp: api.Operation_TYPE, + DropValue: typ, + } + if err := alterWithClient(dg, op); err != nil { + return errors.Wrapf(err, "error deleting old type %s", typ) + } + } + fmt.Println("Successfully dropped the deprecated predicates") + return nil +} + +func upgradePersitentQuery() error { + dg, cb := x.GetDgraphClient(Upgrade.Conf, true) + defer cb() + + jwt, err := getAccessJwt() + if err != nil { + return errors.Wrap(err, "while getting jwt auth token") + } + + // Get persisted queries. + queryData := make(map[string][]pquery) + if err := getQueryResult(dg, queryPersistedQuery_v21_03_0, &queryData); err != nil { + return errors.Wrap(err, "error querying persisted queries") + } + + // Update the schema with new indexer for persisted query. + updatedSchema := ` + : string @index(sha256) . + type { + + } + ` + if err := alterWithClient(dg, &api.Operation{Schema: updatedSchema}); err != nil { + return fmt.Errorf("error updating the schema for persistent query, %w", err) + } + + // Reinsert these queries. Note that upsert won't work here as 'dgraph.graphql.p_query' is + // graphql reserved type. + header := http.Header{} + header.Set("X-Dgraph-AccessToken", jwt.AccessJwt) + header.Set("X-Dgraph-AuthToken", Upgrade.Conf.GetString(authToken)) + graphqlUrl := Upgrade.Conf.GetString(alphaHttp) + "/graphql" + for _, pquery := range queryData["pquery"] { + updateSchemaParams := &GraphQLParams{ + Query: pquery.Query, + Extensions: &schema.RequestExtensions{PersistedQuery: schema.PersistedQuery{ + Sha256Hash: pquery.SHA, + }}, + Headers: header, + } + + resp, err := makeGqlRequest(updateSchemaParams, graphqlUrl) + if err != nil { + return err + } + if len(resp.Errors) > 0 { + return errors.Errorf("Error while updating the schema %s\n", resp.Errors.Error()) + } + } + + return nil +} + +func upgradeCORS() error { + dg, cb := x.GetDgraphClient(Upgrade.Conf, true) + defer cb() + + jwt, err := getAccessJwt() + if err != nil { + return errors.Wrap(err, "while getting jwt auth token") + } + + // Get CORS. + corsData := make(map[string][]cors) + if err = getQueryResult(dg, queryCORS_v21_03_0, &corsData); err != nil { + return errors.Wrap(err, "error querying cors") + } + + var corsList []string + var maxUid uint64 + for _, cors := range corsData["cors"] { + uid, err := strconv.ParseUint(cors.UID, 0, 64) + if err != nil { + return err + } + if uid > maxUid { + maxUid = uid + if len(cors.Cors) == 1 && cors.Cors[0] == "*" { + // No need to update the GraphQL schema if all origins are allowed. + corsList = corsList[:0] + continue + } + corsList = cors.Cors + } + } + + // Get GraphQL schema. + schemaData := make(map[string][]sch) + if err = getQueryResult(dg, querySchema_v21_03_0, &schemaData); err != nil { + return errors.Wrap(err, "error querying graphql schema") + } + + var gqlSchema string + maxUid = 0 + for _, schema := range schemaData["schema"] { + uid, err := strconv.ParseUint(schema.UID, 0, 64) + if err != nil { + return err + } + if uid > maxUid { + maxUid = uid + gqlSchema = schema.GQLSchema + } + } + + // Update the GraphQL schema. + if err := updateGQLSchema(jwt, gqlSchema, corsList); err != nil { + return err + } + + // Drop all the deprecated predicates and types. + return dropDeprecated(dg) +} + +////////////////////////////////////// +// BELOW CODE IS FOR OFFLINE UPGRADE. +////////////////////////////////////// + +func getData(db *badger.DB, attr string, fn func(item *badger.Item) error) error { + return db.View(func(txn *badger.Txn) error { + attr = x.GalaxyAttr(attr) + initKey := x.ParsedKey{ + Attr: attr, + } + prefix := initKey.DataPrefix() + startKey := append(x.DataKey(attr, math.MaxUint64)) + + itOpt := badger.DefaultIteratorOptions + itOpt.AllVersions = true + itOpt.Reverse = true + itOpt.Prefix = prefix + itr := txn.NewIterator(itOpt) + defer itr.Close() + for itr.Seek(startKey); itr.Valid(); itr.Next() { + item := itr.Item() + // We expect only complete posting list. + x.AssertTrue(item.UserMeta() == posting.BitCompletePosting) + if err := fn(item); err != nil { + return err + } + break + } + return nil + }) +} + +func updateGQLSchemaOffline(db *badger.DB, cors [][]byte) error { + entry := &badger.Entry{} + var version uint64 + err := getData(db, "dgraph.graphql.schema", func(item *badger.Item) error { + var err error + entry.Key = item.KeyCopy(nil) + entry.Value, err = item.ValueCopy(nil) + if err != nil { + return err + } + entry.UserMeta = item.UserMeta() + entry.ExpiresAt = item.ExpiresAt() + version = item.Version() + return nil + }) + if err != nil { + return err + } + if entry.Key == nil { + return nil + } + + var corsBytes []byte + corsBytes = append(corsBytes, []byte("\n\n\n# Below schema elements will only work for dgraph"+ + " versions >= 21.03. In older versions it will be ignored.")...) + for _, val := range cors { + corsBytes = append(corsBytes, []byte(fmt.Sprintf("\n# Dgraph.Allow-Origin \"%s\"", + string(val)))...) + } + + // Append the cors at the end of GraphQL schema. + pl := pb.PostingList{} + if err = pl.Unmarshal(entry.Value); err != nil { + return err + } + pl.Postings[0].Value = append(pl.Postings[0].Value, corsBytes...) + entry.Value, err = pl.Marshal() + if err != nil { + return err + } + + txn := db.NewTransactionAt(math.MaxUint64, true) + defer txn.Discard() + if err = txn.SetEntry(entry); err != nil { + return err + } + return txn.CommitAt(version, nil) +} + +func getCors(db *badger.DB) ([][]byte, error) { + var corsVals [][]byte + err := getData(db, "dgraph.cors", func(item *badger.Item) error { + val, err := item.ValueCopy(nil) + if err != nil { + return err + } + pl := pb.PostingList{} + if err := pl.Unmarshal(val); err != nil { + return err + } + for _, p := range pl.Postings { + corsVals = append(corsVals, p.Value) + } + return nil + }) + return corsVals, err +} + +func dropDepreciated(db *badger.DB) error { + var prefixes [][]byte + for pred := range deprecatedPreds { + pred = x.GalaxyAttr(pred) + prefixes = append(prefixes, x.SchemaKey(pred), x.PredicatePrefix(pred)) + } + for typ := range deprecatedTypes { + prefixes = append(prefixes, x.TypeKey(x.GalaxyAttr(typ))) + } + return db.DropPrefix(prefixes...) +} + +// fixCors removes the internal predicates from the restored backup. This also appends CORS from +// dgraph.cors to GraphQL schema. +func fixCors(db *badger.DB) error { + glog.Infof("Fixing cors information in the restored backup.") + cors, err := getCors(db) + if err != nil { + return errors.Wrapf(err, "Error while getting cors from db.") + } + if err := updateGQLSchemaOffline(db, cors); err != nil { + return errors.Wrapf(err, "Error while updating GraphQL schema.") + } + return errors.Wrapf(dropDepreciated(db), "Error while dropping depreciated preds/types.") +} + +// fixPersistedQuery, for the schema related to persisted query removes the deprecated field from +// the type and updates the index tokenizer for the predicate. +func fixPersistedQuery(db *badger.DB) error { + glog.Infof("Fixing persisted query schema in restored backup.") + update := func(entry *badger.Entry) error { + txn := db.NewTransactionAt(math.MaxUint64, true) + defer txn.Discard() + if err := txn.SetEntry(entry); err != nil { + return err + } + // Schema is written at version 1. + return txn.CommitAt(1, nil) + } + + // Update the tokenizer in the schema. + su := pb.SchemaUpdate{ + Predicate: x.GalaxyAttr("dgraph.graphql.p_query"), + ValueType: pb.Posting_STRING, + Directive: pb.SchemaUpdate_INDEX, + Tokenizer: []string{"sha256"}, + } + data, err := su.Marshal() + if err != nil { + return err + } + entry := &badger.Entry{} + entry.Key = x.SchemaKey(x.GalaxyAttr("dgraph.graphql.p_query")) + entry.Value = data + entry.UserMeta = posting.BitSchemaPosting + if err := update(entry); err != nil { + return errors.Wrap(err, "while updating persisted query's schema") + } + + // Update the type. + tu := pb.TypeUpdate{ + TypeName: x.GalaxyAttr("dgraph.graphql.persisted_query"), + Fields: []*pb.SchemaUpdate{&su}, + } + data, err = tu.Marshal() + if err != nil { + return err + } + entry = &badger.Entry{} + entry.Key = x.TypeKey(x.GalaxyAttr("dgraph.graphql.persisted_query")) + entry.Value = data + entry.UserMeta = posting.BitSchemaPosting + if err := update(entry); err != nil { + return errors.Wrap(err, "while updating persisted query's type") + } + return nil +} + +// OfflineUpgradeFrom2011To2103 upgrades a p directory restored from backup of 20.11 to the changes +// in 21.03. It fixes the cors, schema and drops the deprecated types/predicates. +func OfflineUpgradeFrom2011To2103(db *badger.DB) error { + if err := fixPersistedQuery(db); err != nil { + return errors.Wrapf(err, "while upgrading persisted query") + } + return errors.Wrapf(fixCors(db), "while upgrading cors") +} diff --git a/upgrade/upgrade.go b/upgrade/upgrade.go index a2e8093ec06..f20a1b9b3e6 100644 --- a/upgrade/upgrade.go +++ b/upgrade/upgrade.go @@ -44,6 +44,9 @@ const ( acl = "acl" dryRun = "dry-run" alpha = "alpha" + slashGrpc = "slash_grpc_endpoint" + authToken = "auth_token" + alphaHttp = "alpha-http" user = "user" password = "password" deleteOld = "deleteOld" @@ -152,12 +155,23 @@ func init() { flag := Upgrade.Cmd.Flags() flag.Bool(acl, false, "upgrade ACL from v1.2.2 to >=v20.03.0") flag.Bool(dryRun, false, "dry-run the upgrade") - flag.StringP(alpha, "a", "127.0.0.1:9080", "Dgraph Alpha gRPC server address") + flag.StringP(alpha, "a", "127.0.0.1:9080", + "Comma separated list of Dgraph Alpha gRPC server address") + flag.String(slashGrpc, "", "Path to Slash GraphQL GRPC endpoint. "+ + "If --slash_grpc_endpoint is set, all other TLS options and connection options will be "+ + "ignored") + flag.String(authToken, "", + "The auth token passed to the server for Alter operation of the schema file. "+ + "If used with --slash_grpc_endpoint, then this should be set to the API token issued"+ + "by Slash GraphQL") + flag.String(alphaHttp, "http://127.0.0.1:8080", "Draph Alpha HTTP(S) endpoint.") flag.StringP(user, "u", "", "Username of ACL user") flag.StringP(password, "p", "", "Password of ACL user") flag.BoolP(deleteOld, "d", true, "Delete the older ACL types/predicates") flag.StringP(from, "f", "", "The version string from which to upgrade, e.g.: v1.2.2") flag.StringP(to, "t", "", "The version string till which to upgrade, e.g.: v20.03.0") + + x.RegisterClientTLSFlags(flag) } func run() { @@ -167,6 +181,9 @@ func run() { return } + // Login using dgo client fetches the information from creds flag. + Upgrade.Conf.Set("creds", fmt.Sprintf("user=%s; password=%s; namespace=%d", + Upgrade.Conf.GetString(user), Upgrade.Conf.GetString(password), x.GalaxyNamespace)) applyChangeList(cmdInput, allChanges) } diff --git a/upgrade/utils.go b/upgrade/utils.go index ceed71d1271..d576fa68ad5 100644 --- a/upgrade/utils.go +++ b/upgrade/utils.go @@ -17,45 +17,151 @@ package upgrade import ( + "bytes" "context" "encoding/json" "fmt" + "io/ioutil" + "net/http" "strings" "time" "github.com/dgraph-io/dgo/v200" "github.com/dgraph-io/dgo/v200/protos/api" + "github.com/dgraph-io/dgraph/graphql/schema" "github.com/dgraph-io/dgraph/x" - "google.golang.org/grpc" + "github.com/pkg/errors" ) -// getDgoClient creates a gRPC connection and uses that to create a new dgo client. -// The gRPC.ClientConn returned by this must be closed after use. -func getDgoClient(withLogin bool) (*dgo.Dgraph, *grpc.ClientConn, error) { - alpha := Upgrade.Conf.GetString(alpha) +// getAccessJwt gets the access jwt token from by logging into the cluster. +func getAccessJwt() (*api.Jwt, error) { + user := Upgrade.Conf.GetString(user) + password := Upgrade.Conf.GetString(password) + header := http.Header{} + header.Set("X-Dgraph-AuthToken", Upgrade.Conf.GetString(authToken)) + updateSchemaParams := &GraphQLParams{ + Query: `mutation login($userId: String, $password: String, $namespace: Int) { + login(userId: $userId, password: $password, namespace: $namespace) { + response { + accessJWT + } + } + }`, + Variables: map[string]interface{}{"userId": user, "password": password, + "namespace": x.GalaxyNamespace}, + Headers: header, + } - // TODO(Aman): add TLS configuration. - conn, err := grpc.Dial(alpha, grpc.WithInsecure()) + adminUrl := Upgrade.Conf.GetString(alphaHttp) + "/admin" + resp, err := makeGqlRequest(updateSchemaParams, adminUrl) if err != nil { - return nil, nil, fmt.Errorf("unable to connect to Dgraph cluster: %w", err) + return nil, err + } + if len(resp.Errors) > 0 { + return nil, errors.Errorf("Error while updating the schema %s\n", resp.Errors.Error()) } - dg := dgo.NewDgraphClient(api.NewDgraphClient(conn)) - - if withLogin { - userName := Upgrade.Conf.GetString(user) - password := Upgrade.Conf.GetString(password) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - // login to cluster - //TODO(Ahsan): What should be the namespace here? - if err = dg.LoginIntoNamespace(ctx, userName, password, x.GalaxyNamespace); err != nil { - x.Check(conn.Close()) - return nil, nil, fmt.Errorf("unable to login to Dgraph cluster: %w", err) + type Response struct { + Login struct { + Response struct { + AccessJWT string + } } } + var r Response + if err := json.Unmarshal(resp.Data, &r); err != nil { + return nil, err + } + + jwt := &api.Jwt{AccessJwt: r.Login.Response.AccessJWT} + return jwt, nil +} + +type GraphQLParams struct { + Query string `json:"query"` + OperationName string `json:"operationName"` + Variables map[string]interface{} `json:"variables"` + Extensions *schema.RequestExtensions `json:"extensions,omitempty"` + Headers http.Header +} + +// GraphQLResponse GraphQL response structure. +// see https://graphql.github.io/graphql-spec/June2018/#sec-Response +type GraphQLResponse struct { + Data json.RawMessage `json:"data,omitempty"` + Errors x.GqlErrorList `json:"errors,omitempty"` + Extensions map[string]interface{} `json:"extensions,omitempty"` +} + +func makeGqlRequest(params *GraphQLParams, url string) (*GraphQLResponse, error) { + req, err := createGQLPost(params, url) + if err != nil { + return nil, err + } + for h := range params.Headers { + req.Header.Set(h, params.Headers.Get(h)) + } + res, err := runGQLRequest(req) + if err != nil { + return nil, err + } + + var result *GraphQLResponse + if err := json.Unmarshal(res, &result); err != nil { + return nil, err + } + + return result, nil +} + +func createGQLPost(params *GraphQLParams, url string) (*http.Request, error) { + body, err := json.Marshal(params) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(body)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + return req, nil +} + +// runGQLRequest runs a HTTP GraphQL request and returns the data or any errors. +func runGQLRequest(req *http.Request) ([]byte, error) { + config, err := x.LoadClientTLSConfig(Upgrade.Conf) + if err != nil { + return nil, err + } + tr := &http.Transport{TLSClientConfig: config} + client := &http.Client{Timeout: 50 * time.Second, Transport: tr} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + + // GraphQL server should always return OK, even when there are errors + if status := resp.StatusCode; status != http.StatusOK { + return nil, errors.Errorf("unexpected status code: %v", status) + } + + if strings.ToLower(resp.Header.Get("Content-Type")) != "application/json" { + return nil, errors.Errorf("unexpected content type: %v", resp.Header.Get("Content-Type")) + } + + if resp.Header.Get("Access-Control-Allow-Origin") != "*" { + return nil, errors.Errorf("cors headers weren't set in response") + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Errorf("unable to read response body: %v", err) + } - return dg, conn, nil + return body, nil } // getQueryResult executes the given query and unmarshals the result in given pointer queryResPtr. diff --git a/worker/online_restore_ee.go b/worker/online_restore_ee.go index f9099b37d6b..566c3c6fa10 100644 --- a/worker/online_restore_ee.go +++ b/worker/online_restore_ee.go @@ -355,7 +355,11 @@ func writeBackup(ctx context.Context, req *pb.RestoreRequest) error { } maxUid, maxNsId, err := loadFromBackup(pstore, &loadBackupInput{ - r: gzReader, restoreTs: req.RestoreTs, preds: in.preds, dropOperations: in.dropOperations, + r: gzReader, + restoreTs: req.RestoreTs, + preds: in.preds, + dropOperations: in.dropOperations, + isOld: in.isOld, }) if err != nil { return 0, 0, errors.Wrapf(err, "cannot write backup") diff --git a/worker/restore.go b/worker/restore.go index ee1e24c4741..a9b43349a5e 100644 --- a/worker/restore.go +++ b/worker/restore.go @@ -63,6 +63,10 @@ func RunRestore(pdir, location, backupId string, key x.SensitiveByteSlice, } return 0, 0, err } + + if !pathExist(dir) { + fmt.Println("Creating new db:", dir) + } // The badger DB should be opened only after creating the backup // file reader and verifying the encryption in the backup file. db, err := badger.OpenManaged(badger.DefaultOptions(dir). @@ -78,9 +82,6 @@ func RunRestore(pdir, location, backupId string, key x.SensitiveByteSlice, return 0, 0, err } defer db.Close() - if !pathExist(dir) { - fmt.Println("Creating new db:", dir) - } maxUid, maxNsId, err := loadFromBackup(db, &loadBackupInput{ r: gzReader, restoreTs: 0, @@ -174,12 +175,8 @@ func loadFromBackup(db *badger.DB, in *loadBackupInput) (uint64, uint64, error) } // Update the max uid and namespace id that has been seen while restoring this backup. - if parsedKey.Uid > maxUid { - maxUid = parsedKey.Uid - } - if namespace > maxNsId { - maxNsId = namespace - } + maxUid = x.Max(maxUid, parsedKey.Uid) + maxNsId = x.Max(maxNsId, namespace) // Override the version if requested. Should not be done for type and schema predicates, // which always have their version set to 1. diff --git a/worker/s3_handler.go b/worker/s3_handler.go index 79d7c5c81e6..e61cb646df5 100644 --- a/worker/s3_handler.go +++ b/worker/s3_handler.go @@ -250,12 +250,8 @@ func (h *s3Handler) Load(uri *url.URL, backupId string, backupNum uint64, fn loa if err != nil { return LoadResult{Err: err} } - if groupMaxUid > maxUid { - maxUid = groupMaxUid - } - if groupMaxNsId > maxNsId { - maxNsId = groupMaxNsId - } + maxUid = x.Max(maxUid, groupMaxUid) + maxNsId = x.Max(maxNsId, groupMaxNsId) } since = manifest.Since }