diff --git a/ee/backup/run.go b/backup/run.go similarity index 92% rename from ee/backup/run.go rename to backup/run.go index 076dae32920..9404e6aa6c5 100644 --- a/ee/backup/run.go +++ b/backup/run.go @@ -1,13 +1,17 @@ -// +build !oss - /* - * Copyright 2018 Dgraph Labs, Inc. and Contributors + * Copyright 2021 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * Licensed under the Dgraph Community License (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package backup diff --git a/dgraph/cmd/root.go b/dgraph/cmd/root.go index 3f995973715..a1e07a6ebe0 100644 --- a/dgraph/cmd/root.go +++ b/dgraph/cmd/root.go @@ -29,6 +29,7 @@ import ( "strings" "unicode" + "github.com/dgraph-io/dgraph/backup" "github.com/dgraph-io/dgraph/dgraph/cmd/alpha" "github.com/dgraph-io/dgraph/dgraph/cmd/bulk" "github.com/dgraph-io/dgraph/dgraph/cmd/cert" @@ -41,6 +42,7 @@ import ( "github.com/dgraph-io/dgraph/dgraph/cmd/migrate" "github.com/dgraph-io/dgraph/dgraph/cmd/version" "github.com/dgraph-io/dgraph/dgraph/cmd/zero" + "github.com/dgraph-io/dgraph/updatemanifest" "github.com/dgraph-io/dgraph/upgrade" "github.com/dgraph-io/dgraph/x" @@ -83,9 +85,10 @@ var rootConf = viper.New() // subcommands initially contains all default sub-commands. var subcommands = []*x.SubCommand{ - &bulk.Bulk, &cert.Cert, &conv.Conv, &live.Live, &alpha.Alpha, &zero.Zero, &version.Version, - &debug.Debug, &migrate.Migrate, &debuginfo.DebugInfo, &upgrade.Upgrade, &decrypt.Decrypt, - &increment.Increment, + &bulk.Bulk, &backup.LsBackup, &backup.ExportBackup, &cert.Cert, &conv.Conv, &live.Live, + &alpha.Alpha, &zero.Zero, &version.Version, &debug.Debug, &migrate.Migrate, + &debuginfo.DebugInfo, &upgrade.Upgrade, &decrypt.Decrypt, &increment.Increment, + &updatemanifest.UpdateManifest, } func initCmds() { diff --git a/dgraph/cmd/root_ee.go b/dgraph/cmd/root_ee.go index 80f5591eca5..282562888fa 100644 --- a/dgraph/cmd/root_ee.go +++ b/dgraph/cmd/root_ee.go @@ -15,17 +15,12 @@ package cmd import ( acl "github.com/dgraph-io/dgraph/ee/acl" "github.com/dgraph-io/dgraph/ee/audit" - "github.com/dgraph-io/dgraph/ee/backup" - "github.com/dgraph-io/dgraph/ee/updatemanifest" ) func init() { // subcommands already has the default subcommands, we append to EE ones to that. subcommands = append(subcommands, - &backup.LsBackup, - &backup.ExportBackup, &acl.CmdAcl, &audit.CmdAudit, - &updatemanifest.UpdateManifest, ) } diff --git a/graphql/admin/admin.go b/graphql/admin/admin.go index 0d1f7998cde..e9ef80e6b9a 100644 --- a/graphql/admin/admin.go +++ b/graphql/admin/admin.go @@ -412,6 +412,224 @@ const ( response: AssignedIds } + input BackupInput { + + """ + Destination for the backup: e.g. Minio or S3 bucket. + """ + destination: String! + + """ + Access key credential for the destination. + """ + accessKey: String + + """ + Secret key credential for the destination. + """ + secretKey: String + + """ + AWS session token, if required. + """ + sessionToken: String + + """ + Set to true to allow backing up to S3 or Minio bucket that requires no credentials. + """ + anonymous: Boolean + + """ + Force a full backup instead of an incremental backup. + """ + forceFull: Boolean + } + + type BackupPayload { + response: Response + taskId: String + } + + input RestoreInput { + + """ + Destination for the backup: e.g. Minio or S3 bucket. + """ + location: String! + + """ + Backup ID of the backup series to restore. This ID is included in the manifest.json file. + If missing, it defaults to the latest series. + """ + backupId: String + + """ + Number of the backup within the backup series to be restored. Backups with a greater value + will be ignored. If the value is zero or missing, the entire series will be restored. + """ + backupNum: Int + + """ + All the backups with num >= incrementalFrom will be restored. + """ + incrementalFrom: Int + + """ + If isPartial is set to true then the cluster will be kept in draining mode after + restore. This makes sure that the db is not corrupted by any mutations or tablet moves in + between two restores. + """ + isPartial: Boolean + + """ + Path to the key file needed to decrypt the backup. This file should be accessible + by all alphas in the group. The backup will be written using the encryption key + with which the cluster was started, which might be different than this key. + """ + encryptionKeyFile: String + + """ + Vault server address where the key is stored. This server must be accessible + by all alphas in the group. Default "http://localhost:8200". + """ + vaultAddr: String + + """ + Path to the Vault RoleID file. + """ + vaultRoleIDFile: String + + """ + Path to the Vault SecretID file. + """ + vaultSecretIDFile: String + + """ + Vault kv store path where the key lives. Default "secret/data/dgraph". + """ + vaultPath: String + + """ + Vault kv store field whose value is the key. Default "enc_key". + """ + vaultField: String + + """ + Vault kv store field's format. Must be "base64" or "raw". Default "base64". + """ + vaultFormat: String + + """ + Access key credential for the destination. + """ + accessKey: String + + """ + Secret key credential for the destination. + """ + secretKey: String + + """ + AWS session token, if required. + """ + sessionToken: String + + """ + Set to true to allow backing up to S3 or Minio bucket that requires no credentials. + """ + anonymous: Boolean + } + + type RestorePayload { + """ + A short string indicating whether the restore operation was successfully scheduled. + """ + code: String + + """ + Includes the error message if the operation failed. + """ + message: String + } + + input ListBackupsInput { + """ + Destination for the backup: e.g. Minio or S3 bucket. + """ + location: String! + + """ + Access key credential for the destination. + """ + accessKey: String + + """ + Secret key credential for the destination. + """ + secretKey: String + + """ + AWS session token, if required. + """ + sessionToken: String + + """ + Whether the destination doesn't require credentials (e.g. S3 public bucket). + """ + anonymous: Boolean + + } + + type BackupGroup { + """ + The ID of the cluster group. + """ + groupId: UInt64 + + """ + List of predicates assigned to the group. + """ + predicates: [String] + } + + type Manifest { + """ + Unique ID for the backup series. + """ + backupId: String + + """ + Number of this backup within the backup series. The full backup always has a value of one. + """ + backupNum: UInt64 + + """ + Whether this backup was encrypted. + """ + encrypted: Boolean + + """ + List of groups and the predicates they store in this backup. + """ + groups: [BackupGroup] + + """ + Path to the manifest file. + """ + path: String + + """ + The timestamp at which this backup was taken. The next incremental backup will + start from this timestamp. + """ + since: UInt64 + + """ + The type of backup, either full or incremental. + """ + type: String + } + ` + adminTypes + ` type Query { @@ -421,6 +639,10 @@ const ( state: MembershipState config: Config task(input: TaskInput!): TaskPayload + """ + Get the information about the backups at a given location. + """ + listBackups(input: ListBackupsInput!) : [Manifest] ` + adminQueries + ` } @@ -474,6 +696,16 @@ const ( """ assign(input: AssignInput!): AssignPayload + """ + Start a binary backup. + """ + backup(input: BackupInput!) : BackupPayload + + """ + Start restoring a binary backup. + """ + restore(input: RestoreInput!) : RestorePayload + ` + adminMutations + ` } ` diff --git a/graphql/admin/backup.go b/graphql/admin/backup.go index f0fc836b0a1..23585a7dc89 100644 --- a/graphql/admin/backup.go +++ b/graphql/admin/backup.go @@ -34,12 +34,7 @@ type backupInput struct { } func resolveBackup(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) { - glog.Info("Got backup request") - if !worker.EnterpriseEnabled() { - err := fmt.Errorf("you must enable enterprise features first. " + - "Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.") - return resolve.EmptyResult(m, err), false - } + glog.Info("Got a backup request") input, err := getBackupInput(m) if err != nil { diff --git a/graphql/admin/endpoints_ee.go b/graphql/admin/endpoints_ee.go index 34964c7f2bc..be224527ff7 100644 --- a/graphql/admin/endpoints_ee.go +++ b/graphql/admin/endpoints_ee.go @@ -13,223 +13,6 @@ package admin const adminTypes = ` - input BackupInput { - - """ - Destination for the backup: e.g. Minio or S3 bucket. - """ - destination: String! - - """ - Access key credential for the destination. - """ - accessKey: String - - """ - Secret key credential for the destination. - """ - secretKey: String - - """ - AWS session token, if required. - """ - sessionToken: String - - """ - Set to true to allow backing up to S3 or Minio bucket that requires no credentials. - """ - anonymous: Boolean - - """ - Force a full backup instead of an incremental backup. - """ - forceFull: Boolean - } - - type BackupPayload { - response: Response - taskId: String - } - - input RestoreInput { - - """ - Destination for the backup: e.g. Minio or S3 bucket. - """ - location: String! - - """ - Backup ID of the backup series to restore. This ID is included in the manifest.json file. - If missing, it defaults to the latest series. - """ - backupId: String - - """ - Number of the backup within the backup series to be restored. Backups with a greater value - will be ignored. If the value is zero or missing, the entire series will be restored. - """ - backupNum: Int - - """ - All the backups with num >= incrementalFrom will be restored. - """ - incrementalFrom: Int - - """ - If isPartial is set to true then the cluster will be kept in draining mode after - restore. This makes sure that the db is not corrupted by any mutations or tablet moves in - between two restores. - """ - isPartial: Boolean - - """ - Path to the key file needed to decrypt the backup. This file should be accessible - by all alphas in the group. The backup will be written using the encryption key - with which the cluster was started, which might be different than this key. - """ - encryptionKeyFile: String - - """ - Vault server address where the key is stored. This server must be accessible - by all alphas in the group. Default "http://localhost:8200". - """ - vaultAddr: String - - """ - Path to the Vault RoleID file. - """ - vaultRoleIDFile: String - - """ - Path to the Vault SecretID file. - """ - vaultSecretIDFile: String - - """ - Vault kv store path where the key lives. Default "secret/data/dgraph". - """ - vaultPath: String - - """ - Vault kv store field whose value is the key. Default "enc_key". - """ - vaultField: String - - """ - Vault kv store field's format. Must be "base64" or "raw". Default "base64". - """ - vaultFormat: String - - """ - Access key credential for the destination. - """ - accessKey: String - - """ - Secret key credential for the destination. - """ - secretKey: String - - """ - AWS session token, if required. - """ - sessionToken: String - - """ - Set to true to allow backing up to S3 or Minio bucket that requires no credentials. - """ - anonymous: Boolean - } - - type RestorePayload { - """ - A short string indicating whether the restore operation was successfully scheduled. - """ - code: String - - """ - Includes the error message if the operation failed. - """ - message: String - } - - input ListBackupsInput { - """ - Destination for the backup: e.g. Minio or S3 bucket. - """ - location: String! - - """ - Access key credential for the destination. - """ - accessKey: String - - """ - Secret key credential for the destination. - """ - secretKey: String - - """ - AWS session token, if required. - """ - sessionToken: String - - """ - Whether the destination doesn't require credentials (e.g. S3 public bucket). - """ - anonymous: Boolean - - } - - type BackupGroup { - """ - The ID of the cluster group. - """ - groupId: UInt64 - - """ - List of predicates assigned to the group. - """ - predicates: [String] - } - - type Manifest { - """ - Unique ID for the backup series. - """ - backupId: String - - """ - Number of this backup within the backup series. The full backup always has a value of one. - """ - backupNum: UInt64 - - """ - Whether this backup was encrypted. - """ - encrypted: Boolean - - """ - List of groups and the predicates they store in this backup. - """ - groups: [BackupGroup] - - """ - Path to the manifest file. - """ - path: String - - """ - The timestamp at which this backup was taken. The next incremental backup will - start from this timestamp. - """ - since: UInt64 - - """ - The type of backup, either full or incremental. - """ - type: String - } type LoginResponse { @@ -460,16 +243,6 @@ const adminTypes = ` const adminMutations = ` - """ - Start a binary backup. See : https://dgraph.io/docs/enterprise-features/#binary-backups - """ - backup(input: BackupInput!) : BackupPayload - - """ - Start restoring a binary backup. See : - https://dgraph.io/docs/enterprise-features/#binary-backups - """ - restore(input: RestoreInput!) : RestorePayload """ Login to Dgraph. Successful login results in a JWT that can be used in future requests. @@ -542,8 +315,4 @@ const adminQueries = ` queryUser(filter: UserFilter, order: UserOrder, first: Int, offset: Int): [User] queryGroup(filter: GroupFilter, order: GroupOrder, first: Int, offset: Int): [Group] - """ - Get the information about the backups at a given location. - """ - listBackups(input: ListBackupsInput!) : [Manifest] ` diff --git a/ee/updatemanifest/run.go b/updatemanifest/run.go similarity index 99% rename from ee/updatemanifest/run.go rename to updatemanifest/run.go index b9894eb708b..1d37e77e026 100644 --- a/ee/updatemanifest/run.go +++ b/updatemanifest/run.go @@ -1,5 +1,3 @@ -// +build !oss - /* * Copyright 2021 Dgraph Labs, Inc. and Contributors * diff --git a/worker/backup.go b/worker/backup.go index c2c920d2d14..2a27a532389 100644 --- a/worker/backup.go +++ b/worker/backup.go @@ -18,12 +18,30 @@ package worker import ( "context" + "encoding/binary" + "encoding/hex" + "fmt" + "io" "math" + "net/url" + "path/filepath" + "reflect" + "strings" "sync" + "time" "github.com/dgraph-io/badger/v3" + bpb "github.com/dgraph-io/badger/v3/pb" + "github.com/dgraph-io/badger/v3/y" + "github.com/dgraph-io/ristretto/z" + "github.com/golang/glog" + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" "github.com/pkg/errors" + ostats "go.opencensus.io/stats" + "github.com/dgraph-io/dgraph/ee/enc" + "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" ) @@ -122,3 +140,678 @@ func StoreExport(request *pb.ExportRequest, dir string, key x.Sensitive) error { db.Close() return errors.Wrapf(err, "cannot export data inside DB at %s", dir) } + +// Backup handles a request coming from another node. +func (w *grpcWorker) Backup(ctx context.Context, req *pb.BackupRequest) (*pb.BackupResponse, error) { + glog.V(2).Infof("Received backup request via Grpc: %+v", req) + return backupCurrentGroup(ctx, req) +} + +func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.BackupResponse, error) { + glog.Infof("Backup request: group %d at %d", req.GroupId, req.ReadTs) + if err := ctx.Err(); err != nil { + glog.Errorf("Context error during backup: %v\n", err) + return nil, err + } + + g := groups() + if g.groupId() != req.GroupId { + return nil, errors.Errorf("Backup request group mismatch. Mine: %d. Requested: %d\n", + g.groupId(), req.GroupId) + } + + if err := posting.Oracle().WaitForTs(ctx, req.ReadTs); err != nil { + return nil, err + } + + closer, err := g.Node.startTaskAtTs(opBackup, req.ReadTs) + if err != nil { + return nil, errors.Wrapf(err, "cannot start backup operation") + } + defer closer.Done() + + bp := NewBackupProcessor(pstore, req) + defer bp.Close() + + return bp.WriteBackup(closer.Ctx()) +} + +// BackupGroup backs up the group specified in the backup request. +func BackupGroup(ctx context.Context, in *pb.BackupRequest) (*pb.BackupResponse, error) { + glog.V(2).Infof("Sending backup request: %+v\n", in) + if groups().groupId() == in.GroupId { + return backupCurrentGroup(ctx, in) + } + + // This node is not part of the requested group, send the request over the network. + pl := groups().AnyServer(in.GroupId) + if pl == nil { + return nil, errors.Errorf("Couldn't find a server in group %d", in.GroupId) + } + res, err := pb.NewWorkerClient(pl.Get()).Backup(ctx, in) + if err != nil { + glog.Errorf("Backup error group %d: %s", in.GroupId, err) + return nil, err + } + + return res, nil +} + +// backupLock is used to synchronize backups to avoid more than one backup request +// to be processed at the same time. Multiple requests could lead to multiple +// backups with the same backupNum in their manifest. +var backupLock sync.Mutex + +// BackupRes is used to represent the response and error of the Backup gRPC call together to be +// transported via a channel. +type BackupRes struct { + res *pb.BackupResponse + err error +} + +func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error { + if err := x.HealthCheck(); err != nil { + glog.Errorf("Backup canceled, not ready to accept requests: %s", err) + return err + } + + // Grab the lock here to avoid more than one request to be processed at the same time. + backupLock.Lock() + defer backupLock.Unlock() + + backupSuccessful := false + ostats.Record(ctx, x.NumBackups.M(1), x.PendingBackups.M(1)) + defer func() { + if backupSuccessful { + ostats.Record(ctx, x.NumBackupsSuccess.M(1), x.PendingBackups.M(-1)) + } else { + ostats.Record(ctx, x.NumBackupsFailed.M(1), x.PendingBackups.M(-1)) + } + }() + + ts, err := Timestamps(ctx, &pb.Num{ReadOnly: true}) + if err != nil { + glog.Errorf("Unable to retrieve readonly timestamp for backup: %s", err) + return err + } + + req.ReadTs = ts.ReadOnly + req.UnixTs = time.Now().UTC().Format("20060102.150405.000") + + // Read the manifests to get the right timestamp from which to start the backup. + uri, err := url.Parse(req.Destination) + if err != nil { + return err + } + handler, err := x.NewUriHandler(uri, GetCredentialsFromRequest(req)) + if err != nil { + return err + } + latestManifest, err := GetLatestManifest(handler, uri) + if err != nil { + return err + } + + // Use the readTs as the sinceTs for the next backup. If not found, use the + // SinceTsDeprecated value from the latest manifest. + req.SinceTs = latestManifest.ValidReadTs() + + if req.ForceFull { + // To force a full backup we'll set the sinceTs to zero. + req.SinceTs = 0 + } else { + if x.WorkerConfig.EncryptionKey != nil { + // If encryption key given, latest backup should be encrypted. + if latestManifest.Type != "" && !latestManifest.Encrypted { + err = errors.Errorf("latest manifest indicates the last backup was not encrypted " + + "but this instance has encryption turned on. Try \"forceFull\" flag.") + return err + } + } else { + // If encryption turned off, latest backup should be unencrypted. + if latestManifest.Type != "" && latestManifest.Encrypted { + err = errors.Errorf("latest manifest indicates the last backup was encrypted " + + "but this instance has encryption turned off. Try \"forceFull\" flag.") + return err + } + } + } + + // Update the membership state to get the latest mapping of groups to predicates. + if err := UpdateMembershipState(ctx); err != nil { + return err + } + + // Get the current membership state and parse it for easier processing. + state := GetMembershipState() + var groups []uint32 + predMap := make(map[uint32][]string) + for gid, group := range state.Groups { + groups = append(groups, gid) + predMap[gid] = make([]string, 0) + for pred := range group.Tablets { + predMap[gid] = append(predMap[gid], pred) + } + } + + glog.Infof( + "Created backup request: read_ts:%d since_ts:%d unix_ts:%q destination:%q. Groups=%v\n", + req.ReadTs, req.SinceTs, req.UnixTs, req.Destination, groups) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var dropOperations []*pb.DropOperation + { // This is the code which sends out Backup requests and waits for them to finish. + resCh := make(chan BackupRes, len(state.Groups)) + for _, gid := range groups { + br := proto.Clone(req).(*pb.BackupRequest) + br.GroupId = gid + br.Predicates = predMap[gid] + go func(req *pb.BackupRequest) { + res, err := BackupGroup(ctx, req) + resCh <- BackupRes{res: res, err: err} + }(br) + } + + for range groups { + backupRes := <-resCh + if backupRes.err != nil { + glog.Errorf("Error received during backup: %v", backupRes.err) + return backupRes.err + } + dropOperations = append(dropOperations, backupRes.res.GetDropOperations()...) + } + } + + dir := fmt.Sprintf(backupPathFmt, req.UnixTs) + m := Manifest{ + ReadTs: req.ReadTs, + Groups: predMap, + Version: x.ManifestVersion, + DropOperations: dropOperations, + Path: dir, + Compression: "snappy", + } + if req.SinceTs == 0 { + m.Type = "full" + m.BackupId = x.GetRandomName(1) + m.BackupNum = 1 + } else { + m.Type = "incremental" + m.BackupId = latestManifest.BackupId + m.BackupNum = latestManifest.BackupNum + 1 + } + m.Encrypted = (x.WorkerConfig.EncryptionKey != nil) + + bp := NewBackupProcessor(nil, req) + defer bp.Close() + err = bp.CompleteBackup(ctx, &m) + + if err != nil { + return err + } + + backupSuccessful = true + return nil +} + +func ProcessListBackups(ctx context.Context, location string, creds *x.MinioCredentials) ( + []*Manifest, error) { + + manifests, err := ListBackupManifests(location, creds) + if err != nil { + return nil, errors.Wrapf(err, "cannot read manifests at location %s", location) + } + + res := make([]*Manifest, 0) + for _, m := range manifests { + res = append(res, m) + } + return res, nil +} + +// BackupProcessor handles the different stages of the backup process. +type BackupProcessor struct { + // DB is the Badger pstore managed by this node. + DB *badger.DB + // Request stores the backup request containing the parameters for this backup. + Request *pb.BackupRequest + + // txn is used for the iterators in the threadLocal + txn *badger.Txn + threads []*threadLocal +} + +type threadLocal struct { + Request *pb.BackupRequest + // pre-allocated pb.PostingList object. + pl pb.PostingList + // pre-allocated pb.BackupPostingList object. + bpl pb.BackupPostingList + alloc *z.Allocator + itr *badger.Iterator + buf *z.Buffer +} + +func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor { + bp := &BackupProcessor{ + DB: db, + Request: req, + threads: make([]*threadLocal, x.WorkerConfig.Badger.NumGoroutines), + } + if req.SinceTs > 0 && db != nil { + bp.txn = db.NewTransactionAt(req.ReadTs, false) + } + for i := range bp.threads { + buf := z.NewBuffer(32<<20, "Worker.BackupProcessor"). + WithAutoMmap(1<<30, ""). + WithMaxSize(32 << 30) + + bp.threads[i] = &threadLocal{ + Request: bp.Request, + buf: buf, + } + if bp.txn != nil { + iopt := badger.DefaultIteratorOptions + iopt.AllVersions = true + bp.threads[i].itr = bp.txn.NewIterator(iopt) + } + } + return bp +} + +func (pr *BackupProcessor) Close() { + for _, th := range pr.threads { + if pr.txn != nil { + th.itr.Close() + } + th.buf.Release() + } + if pr.txn != nil { + pr.txn.Discard() + } +} + +// LoadResult holds the output of a Load operation. +type LoadResult struct { + // Version is the timestamp at which the database is after loading a backup. + Version uint64 + // MaxLeaseUid is the max UID seen by the load operation. Needed to request zero + // for the proper number of UIDs. + MaxLeaseUid uint64 + // MaxLeaseNsId is the max namespace ID seen by the load operation. + MaxLeaseNsId uint64 + // The error, if any, of the load operation. + Err error +} + +func createBackupFile(h x.UriHandler, uri *url.URL, req *pb.BackupRequest) (io.WriteCloser, error) { + if !h.DirExists("./") { + if err := h.CreateDir("./"); err != nil { + return nil, errors.Wrap(err, "while creating backup file") + } + } + fileName := backupName(req.ReadTs, req.GroupId) + dir := fmt.Sprintf(backupPathFmt, req.UnixTs) + if err := h.CreateDir(dir); err != nil { + return nil, errors.Wrap(err, "while creating backup file") + } + backupFile := filepath.Join(dir, fileName) + w, err := h.CreateFile(backupFile) + return w, errors.Wrap(err, "while creating backup file") +} + +// WriteBackup uses the request values to create a stream writer then hand off the data +// retrieval to stream.Orchestrate. The writer will create all the fd's needed to +// collect the data and later move to the target. +// Returns errors on failure, nil on success. +func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + uri, err := url.Parse(pr.Request.Destination) + if err != nil { + return nil, err + } + handler, err := x.NewUriHandler(uri, GetCredentialsFromRequest(pr.Request)) + if err != nil { + return nil, err + } + w, err := createBackupFile(handler, uri, pr.Request) + if err != nil { + return nil, err + } + glog.V(3).Infof("Backup manifest version: %d", pr.Request.SinceTs) + + eWriter, err := enc.GetWriter(x.WorkerConfig.EncryptionKey, w) + if err != nil { + return nil, err + } + + // Snappy is much faster than gzip compression, even with the BestSpeed + // gzip option. In fact, in my experiments, gzip compression caused the + // output speed to be ~30 MBps. Snappy can write at ~90 MBps, and overall + // the speed is similar to writing uncompressed data on disk. + // + // These are the times I saw: + // Without compression: 7m2s 33GB output. + // With snappy: 7m11s 9.5GB output. + // With snappy + S3: 7m54s 9.5GB output. + cWriter := snappy.NewBufferedWriter(eWriter) + + stream := pr.DB.NewStreamAt(pr.Request.ReadTs) + stream.LogPrefix = "Dgraph.Backup" + // Ignore versions less than given sinceTs timestamp, or skip older versions of + // the given key by returning an empty list. + // Do not do this for schema and type keys. Those keys always have a + // version of one. They're handled separately. + stream.SinceTs = pr.Request.SinceTs + stream.Prefix = []byte{x.ByteData} + + var response pb.BackupResponse + stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { + tl := pr.threads[itr.ThreadId] + tl.alloc = itr.Alloc + + bitr := itr + // Use the threadlocal iterator because "itr" has the sinceTs set and + // it will not be able to read all the data. + if tl.itr != nil { + bitr = tl.itr + bitr.Seek(key) + } + + kvList, dropOp, err := tl.toBackupList(key, bitr) + if err != nil { + return nil, err + } + // we don't want to append a nil value to the slice, so need to check. + if dropOp != nil { + response.DropOperations = append(response.DropOperations, dropOp) + } + return kvList, nil + } + + predMap := make(map[string]struct{}) + for _, pred := range pr.Request.Predicates { + predMap[pred] = struct{}{} + } + stream.ChooseKey = func(item *badger.Item) bool { + parsedKey, err := x.Parse(item.Key()) + if err != nil { + glog.Errorf("error %v while parsing key %v during backup. Skipping...", + err, hex.EncodeToString(item.Key())) + return false + } + + // Do not choose keys that contain parts of a multi-part list. These keys + // will be accessed from the main list. + if parsedKey.HasStartUid { + return false + } + + // Skip backing up the schema and type keys. They will be backed up separately. + if parsedKey.IsSchema() || parsedKey.IsType() { + return false + } + _, ok := predMap[parsedKey.Attr] + return ok + } + + var maxVersion uint64 + stream.Send = func(buf *z.Buffer) error { + list, err := badger.BufferToKVList(buf) + if err != nil { + return err + } + for _, kv := range list.Kv { + if maxVersion < kv.Version { + maxVersion = kv.Version + } + } + return writeKVList(list, cWriter) + } + + // This is where the execution happens. + if err := stream.Orchestrate(ctx); err != nil { + glog.Errorf("While taking backup: %v", err) + return &response, err + } + + // This is used to backup the schema and types. + writePrefix := func(prefix byte) error { + tl := threadLocal{ + alloc: z.NewAllocator(1<<10, "BackupProcessor.WritePrefix"), + } + defer tl.alloc.Release() + + txn := pr.DB.NewTransactionAt(pr.Request.ReadTs, false) + defer txn.Discard() + // We don't need to iterate over all versions. + iopts := badger.DefaultIteratorOptions + iopts.Prefix = []byte{prefix} + + itr := txn.NewIterator(iopts) + defer itr.Close() + + list := &bpb.KVList{} + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + // Don't export deleted items. + if item.IsDeletedOrExpired() { + continue + } + parsedKey, err := x.Parse(item.Key()) + if err != nil { + glog.Errorf("error %v while parsing key %v during backup. Skipping...", + err, hex.EncodeToString(item.Key())) + continue + } + // This check makes sense only for the schema keys. The types are not stored in it. + if _, ok := predMap[parsedKey.Attr]; !parsedKey.IsType() && !ok { + continue + } + kv := y.NewKV(tl.alloc) + if err := item.Value(func(val []byte) error { + kv.Value = append(kv.Value, val...) + return nil + }); err != nil { + return errors.Wrapf(err, "while copying value") + } + + backupKey, err := tl.toBackupKey(item.Key()) + if err != nil { + return err + } + kv.Key = backupKey + kv.UserMeta = tl.alloc.Copy([]byte{item.UserMeta()}) + kv.Version = item.Version() + kv.ExpiresAt = item.ExpiresAt() + list.Kv = append(list.Kv, kv) + } + return writeKVList(list, cWriter) + } + + for _, prefix := range []byte{x.ByteSchema, x.ByteType} { + if err := writePrefix(prefix); err != nil { + glog.Errorf("While writing prefix %d to backup: %v", prefix, err) + return &response, err + } + } + + if maxVersion > pr.Request.ReadTs { + glog.Errorf("Max timestamp seen during backup (%d) is greater than readTs (%d)", + maxVersion, pr.Request.ReadTs) + } + + glog.V(2).Infof("Backup group %d version: %d", pr.Request.GroupId, pr.Request.ReadTs) + if err = cWriter.Close(); err != nil { + glog.Errorf("While closing gzipped writer: %v", err) + return &response, err + } + + if err = w.Close(); err != nil { + glog.Errorf("While closing handler: %v", err) + return &response, err + } + glog.Infof("Backup complete: group %d at %d", pr.Request.GroupId, pr.Request.ReadTs) + return &response, nil +} + +// CompleteBackup will finalize a backup by writing the manifest at the backup destination. +func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) error { + if err := ctx.Err(); err != nil { + return err + } + uri, err := url.Parse(pr.Request.Destination) + if err != nil { + return err + } + handler, err := x.NewUriHandler(uri, GetCredentialsFromRequest(pr.Request)) + if err != nil { + return err + } + + manifest, err := GetManifestNoUpgrade(handler, uri) + if err != nil { + return err + } + manifest.Manifests = append(manifest.Manifests, m) + + if err := CreateManifest(handler, uri, manifest); err != nil { + return errors.Wrap(err, "Complete backup failed") + } + glog.Infof("Backup completed OK.") + return nil +} + +// GoString implements the GoStringer interface for Manifest. +func (m *Manifest) GoString() string { + return fmt.Sprintf(`Manifest{Since: %d, ReadTs: %d, Groups: %v, Encrypted: %v}`, + m.SinceTsDeprecated, m.ReadTs, m.Groups, m.Encrypted) +} + +func (tl *threadLocal) toBackupList(key []byte, itr *badger.Iterator) ( + *bpb.KVList, *pb.DropOperation, error) { + list := &bpb.KVList{} + var dropOp *pb.DropOperation + + item := itr.Item() + if item.Version() < tl.Request.SinceTs { + return list, nil, + errors.Errorf("toBackupList: Item.Version(): %d should be less than sinceTs: %d", + item.Version(), tl.Request.SinceTs) + } + if item.IsDeletedOrExpired() { + return list, nil, nil + } + + switch item.UserMeta() { + case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: + l, err := posting.ReadPostingList(key, itr) + if err != nil { + return nil, nil, errors.Wrapf(err, "while reading posting list") + } + + // Don't allocate kv on tl.alloc, because we don't need it by the end of this func. + kv, err := l.ToBackupPostingList(&tl.bpl, tl.alloc, tl.buf) + if err != nil { + return nil, nil, errors.Wrapf(err, "while rolling up list") + } + + backupKey, err := tl.toBackupKey(kv.Key) + if err != nil { + return nil, nil, err + } + + // check if this key was storing a DROP operation record. If yes, get the drop operation. + dropOp, err = checkAndGetDropOp(key, l, tl.Request.ReadTs) + if err != nil { + return nil, nil, err + } + + kv.Key = backupKey + list.Kv = append(list.Kv, kv) + default: + return nil, nil, errors.Errorf( + "Unexpected meta: %d for key: %s", item.UserMeta(), hex.Dump(key)) + } + return list, dropOp, nil +} + +func (tl *threadLocal) toBackupKey(key []byte) ([]byte, error) { + parsedKey, err := x.Parse(key) + if err != nil { + return nil, errors.Wrapf(err, "could not parse key %s", hex.Dump(key)) + } + bk := parsedKey.ToBackupKey() + + out := tl.alloc.Allocate(bk.Size()) + n, err := bk.MarshalToSizedBuffer(out) + return out[:n], err +} + +func writeKVList(list *bpb.KVList, w io.Writer) error { + if err := binary.Write(w, binary.LittleEndian, uint64(list.Size())); err != nil { + return err + } + buf, err := list.Marshal() + if err != nil { + return err + } + _, err = w.Write(buf) + return err +} + +func checkAndGetDropOp(key []byte, l *posting.List, readTs uint64) (*pb.DropOperation, error) { + isDropOpKey, err := x.IsDropOpKey(key) + if err != nil || !isDropOpKey { + return nil, err + } + + vals, err := l.AllValues(readTs) + if err != nil { + return nil, errors.Wrapf(err, "cannot read value of dgraph.drop.op") + } + switch len(vals) { + case 0: + // do nothing, it means this one was deleted with S * * deletion. + // So, no need to consider it. + return nil, nil + case 1: + val, ok := vals[0].Value.([]byte) + if !ok { + return nil, errors.Errorf("cannot convert value of dgraph.drop.op to byte array, "+ + "got type: %s, value: %v, tid: %v", reflect.TypeOf(vals[0].Value), vals[0].Value, + vals[0].Tid) + } + // A dgraph.drop.op record can have values in only one of the following formats: + // * DROP_ALL; + // * DROP_DATA;ns + // * DROP_ATTR;attrName + // * DROP_NS;ns + // So, accordingly construct the *pb.DropOperation. + dropOp := &pb.DropOperation{} + dropInfo := strings.SplitN(string(val), ";", 2) + if len(dropInfo) != 2 { + return nil, errors.Errorf("Unexpected value: %s for dgraph.drop.op", val) + } + switch dropInfo[0] { + case "DROP_ALL": + dropOp.DropOp = pb.DropOperation_ALL + case "DROP_DATA": + dropOp.DropOp = pb.DropOperation_DATA + dropOp.DropValue = dropInfo[1] // contains namespace. + case "DROP_ATTR": + dropOp.DropOp = pb.DropOperation_ATTR + dropOp.DropValue = dropInfo[1] + case "DROP_NS": + dropOp.DropOp = pb.DropOperation_NS + dropOp.DropValue = dropInfo[1] // contains namespace. + } + return dropOp, nil + default: + // getting more than one values for a non-list predicate is an error + return nil, errors.Errorf("found multiple values for dgraph.drop.op: %v", vals) + } +} diff --git a/worker/backup_ee.go b/worker/backup_ee.go deleted file mode 100644 index 13137916a20..00000000000 --- a/worker/backup_ee.go +++ /dev/null @@ -1,717 +0,0 @@ -// +build !oss - -/* - * Copyright 2018 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Dgraph Community License (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt - */ - -package worker - -import ( - "context" - "encoding/binary" - "encoding/hex" - "fmt" - "io" - "net/url" - "path/filepath" - "reflect" - "strings" - "sync" - "time" - - "github.com/dgraph-io/badger/v3" - bpb "github.com/dgraph-io/badger/v3/pb" - "github.com/dgraph-io/badger/v3/y" - "github.com/dgraph-io/dgraph/ee/enc" - "github.com/dgraph-io/dgraph/posting" - "github.com/dgraph-io/dgraph/protos/pb" - "github.com/dgraph-io/dgraph/x" - "github.com/dgraph-io/ristretto/z" - ostats "go.opencensus.io/stats" - - "github.com/golang/glog" - "github.com/golang/protobuf/proto" - "github.com/golang/snappy" - "github.com/pkg/errors" -) - -// Backup handles a request coming from another node. -func (w *grpcWorker) Backup(ctx context.Context, req *pb.BackupRequest) (*pb.BackupResponse, error) { - glog.V(2).Infof("Received backup request via Grpc: %+v", req) - return backupCurrentGroup(ctx, req) -} - -func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.BackupResponse, error) { - glog.Infof("Backup request: group %d at %d", req.GroupId, req.ReadTs) - if err := ctx.Err(); err != nil { - glog.Errorf("Context error during backup: %v\n", err) - return nil, err - } - - g := groups() - if g.groupId() != req.GroupId { - return nil, errors.Errorf("Backup request group mismatch. Mine: %d. Requested: %d\n", - g.groupId(), req.GroupId) - } - - if err := posting.Oracle().WaitForTs(ctx, req.ReadTs); err != nil { - return nil, err - } - - closer, err := g.Node.startTaskAtTs(opBackup, req.ReadTs) - if err != nil { - return nil, errors.Wrapf(err, "cannot start backup operation") - } - defer closer.Done() - - bp := NewBackupProcessor(pstore, req) - defer bp.Close() - - return bp.WriteBackup(closer.Ctx()) -} - -// BackupGroup backs up the group specified in the backup request. -func BackupGroup(ctx context.Context, in *pb.BackupRequest) (*pb.BackupResponse, error) { - glog.V(2).Infof("Sending backup request: %+v\n", in) - if groups().groupId() == in.GroupId { - return backupCurrentGroup(ctx, in) - } - - // This node is not part of the requested group, send the request over the network. - pl := groups().AnyServer(in.GroupId) - if pl == nil { - return nil, errors.Errorf("Couldn't find a server in group %d", in.GroupId) - } - res, err := pb.NewWorkerClient(pl.Get()).Backup(ctx, in) - if err != nil { - glog.Errorf("Backup error group %d: %s", in.GroupId, err) - return nil, err - } - - return res, nil -} - -// backupLock is used to synchronize backups to avoid more than one backup request -// to be processed at the same time. Multiple requests could lead to multiple -// backups with the same backupNum in their manifest. -var backupLock sync.Mutex - -// BackupRes is used to represent the response and error of the Backup gRPC call together to be -// transported via a channel. -type BackupRes struct { - res *pb.BackupResponse - err error -} - -func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error { - if err := x.HealthCheck(); err != nil { - glog.Errorf("Backup canceled, not ready to accept requests: %s", err) - return err - } - - // Grab the lock here to avoid more than one request to be processed at the same time. - backupLock.Lock() - defer backupLock.Unlock() - - backupSuccessful := false - ostats.Record(ctx, x.NumBackups.M(1), x.PendingBackups.M(1)) - defer func() { - if backupSuccessful { - ostats.Record(ctx, x.NumBackupsSuccess.M(1), x.PendingBackups.M(-1)) - } else { - ostats.Record(ctx, x.NumBackupsFailed.M(1), x.PendingBackups.M(-1)) - } - }() - - ts, err := Timestamps(ctx, &pb.Num{ReadOnly: true}) - if err != nil { - glog.Errorf("Unable to retrieve readonly timestamp for backup: %s", err) - return err - } - - req.ReadTs = ts.ReadOnly - req.UnixTs = time.Now().UTC().Format("20060102.150405.000") - - // Read the manifests to get the right timestamp from which to start the backup. - uri, err := url.Parse(req.Destination) - if err != nil { - return err - } - handler, err := x.NewUriHandler(uri, GetCredentialsFromRequest(req)) - if err != nil { - return err - } - latestManifest, err := GetLatestManifest(handler, uri) - if err != nil { - return err - } - - // Use the readTs as the sinceTs for the next backup. If not found, use the - // SinceTsDeprecated value from the latest manifest. - req.SinceTs = latestManifest.ValidReadTs() - - if req.ForceFull { - // To force a full backup we'll set the sinceTs to zero. - req.SinceTs = 0 - } else { - if x.WorkerConfig.EncryptionKey != nil { - // If encryption key given, latest backup should be encrypted. - if latestManifest.Type != "" && !latestManifest.Encrypted { - err = errors.Errorf("latest manifest indicates the last backup was not encrypted " + - "but this instance has encryption turned on. Try \"forceFull\" flag.") - return err - } - } else { - // If encryption turned off, latest backup should be unencrypted. - if latestManifest.Type != "" && latestManifest.Encrypted { - err = errors.Errorf("latest manifest indicates the last backup was encrypted " + - "but this instance has encryption turned off. Try \"forceFull\" flag.") - return err - } - } - } - - // Update the membership state to get the latest mapping of groups to predicates. - if err := UpdateMembershipState(ctx); err != nil { - return err - } - - // Get the current membership state and parse it for easier processing. - state := GetMembershipState() - var groups []uint32 - predMap := make(map[uint32][]string) - for gid, group := range state.Groups { - groups = append(groups, gid) - predMap[gid] = make([]string, 0) - for pred := range group.Tablets { - predMap[gid] = append(predMap[gid], pred) - } - } - - glog.Infof( - "Created backup request: read_ts:%d since_ts:%d unix_ts:%q destination:%q. Groups=%v\n", - req.ReadTs, req.SinceTs, req.UnixTs, req.Destination, groups) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - var dropOperations []*pb.DropOperation - { // This is the code which sends out Backup requests and waits for them to finish. - resCh := make(chan BackupRes, len(state.Groups)) - for _, gid := range groups { - br := proto.Clone(req).(*pb.BackupRequest) - br.GroupId = gid - br.Predicates = predMap[gid] - go func(req *pb.BackupRequest) { - res, err := BackupGroup(ctx, req) - resCh <- BackupRes{res: res, err: err} - }(br) - } - - for range groups { - backupRes := <-resCh - if backupRes.err != nil { - glog.Errorf("Error received during backup: %v", backupRes.err) - return backupRes.err - } - dropOperations = append(dropOperations, backupRes.res.GetDropOperations()...) - } - } - - dir := fmt.Sprintf(backupPathFmt, req.UnixTs) - m := Manifest{ - ReadTs: req.ReadTs, - Groups: predMap, - Version: x.ManifestVersion, - DropOperations: dropOperations, - Path: dir, - Compression: "snappy", - } - if req.SinceTs == 0 { - m.Type = "full" - m.BackupId = x.GetRandomName(1) - m.BackupNum = 1 - } else { - m.Type = "incremental" - m.BackupId = latestManifest.BackupId - m.BackupNum = latestManifest.BackupNum + 1 - } - m.Encrypted = (x.WorkerConfig.EncryptionKey != nil) - - bp := NewBackupProcessor(nil, req) - defer bp.Close() - err = bp.CompleteBackup(ctx, &m) - - if err != nil { - return err - } - - backupSuccessful = true - return nil -} - -func ProcessListBackups(ctx context.Context, location string, creds *x.MinioCredentials) ( - []*Manifest, error) { - - manifests, err := ListBackupManifests(location, creds) - if err != nil { - return nil, errors.Wrapf(err, "cannot read manifests at location %s", location) - } - - res := make([]*Manifest, 0) - for _, m := range manifests { - res = append(res, m) - } - return res, nil -} - -// BackupProcessor handles the different stages of the backup process. -type BackupProcessor struct { - // DB is the Badger pstore managed by this node. - DB *badger.DB - // Request stores the backup request containing the parameters for this backup. - Request *pb.BackupRequest - - // txn is used for the iterators in the threadLocal - txn *badger.Txn - threads []*threadLocal -} - -type threadLocal struct { - Request *pb.BackupRequest - // pre-allocated pb.PostingList object. - pl pb.PostingList - // pre-allocated pb.BackupPostingList object. - bpl pb.BackupPostingList - alloc *z.Allocator - itr *badger.Iterator - buf *z.Buffer -} - -func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor { - bp := &BackupProcessor{ - DB: db, - Request: req, - threads: make([]*threadLocal, x.WorkerConfig.Badger.NumGoroutines), - } - if req.SinceTs > 0 && db != nil { - bp.txn = db.NewTransactionAt(req.ReadTs, false) - } - for i := range bp.threads { - buf := z.NewBuffer(32<<20, "Worker.BackupProcessor"). - WithAutoMmap(1<<30, ""). - WithMaxSize(32 << 30) - - bp.threads[i] = &threadLocal{ - Request: bp.Request, - buf: buf, - } - if bp.txn != nil { - iopt := badger.DefaultIteratorOptions - iopt.AllVersions = true - bp.threads[i].itr = bp.txn.NewIterator(iopt) - } - } - return bp -} - -func (pr *BackupProcessor) Close() { - for _, th := range pr.threads { - if pr.txn != nil { - th.itr.Close() - } - th.buf.Release() - } - if pr.txn != nil { - pr.txn.Discard() - } -} - -// LoadResult holds the output of a Load operation. -type LoadResult struct { - // Version is the timestamp at which the database is after loading a backup. - Version uint64 - // MaxLeaseUid is the max UID seen by the load operation. Needed to request zero - // for the proper number of UIDs. - MaxLeaseUid uint64 - // MaxLeaseNsId is the max namespace ID seen by the load operation. - MaxLeaseNsId uint64 - // The error, if any, of the load operation. - Err error -} - -func createBackupFile(h x.UriHandler, uri *url.URL, req *pb.BackupRequest) (io.WriteCloser, error) { - if !h.DirExists("./") { - if err := h.CreateDir("./"); err != nil { - return nil, errors.Wrap(err, "while creating backup file") - } - } - fileName := backupName(req.ReadTs, req.GroupId) - dir := fmt.Sprintf(backupPathFmt, req.UnixTs) - if err := h.CreateDir(dir); err != nil { - return nil, errors.Wrap(err, "while creating backup file") - } - backupFile := filepath.Join(dir, fileName) - w, err := h.CreateFile(backupFile) - return w, errors.Wrap(err, "while creating backup file") -} - -// WriteBackup uses the request values to create a stream writer then hand off the data -// retrieval to stream.Orchestrate. The writer will create all the fd's needed to -// collect the data and later move to the target. -// Returns errors on failure, nil on success. -func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse, error) { - if err := ctx.Err(); err != nil { - return nil, err - } - uri, err := url.Parse(pr.Request.Destination) - if err != nil { - return nil, err - } - handler, err := x.NewUriHandler(uri, GetCredentialsFromRequest(pr.Request)) - if err != nil { - return nil, err - } - w, err := createBackupFile(handler, uri, pr.Request) - if err != nil { - return nil, err - } - glog.V(3).Infof("Backup manifest version: %d", pr.Request.SinceTs) - - eWriter, err := enc.GetWriter(x.WorkerConfig.EncryptionKey, w) - if err != nil { - return nil, err - } - - // Snappy is much faster than gzip compression, even with the BestSpeed - // gzip option. In fact, in my experiments, gzip compression caused the - // output speed to be ~30 MBps. Snappy can write at ~90 MBps, and overall - // the speed is similar to writing uncompressed data on disk. - // - // These are the times I saw: - // Without compression: 7m2s 33GB output. - // With snappy: 7m11s 9.5GB output. - // With snappy + S3: 7m54s 9.5GB output. - cWriter := snappy.NewBufferedWriter(eWriter) - - stream := pr.DB.NewStreamAt(pr.Request.ReadTs) - stream.LogPrefix = "Dgraph.Backup" - // Ignore versions less than given sinceTs timestamp, or skip older versions of - // the given key by returning an empty list. - // Do not do this for schema and type keys. Those keys always have a - // version of one. They're handled separately. - stream.SinceTs = pr.Request.SinceTs - stream.Prefix = []byte{x.ByteData} - - var response pb.BackupResponse - stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { - tl := pr.threads[itr.ThreadId] - tl.alloc = itr.Alloc - - bitr := itr - // Use the threadlocal iterator because "itr" has the sinceTs set and - // it will not be able to read all the data. - if tl.itr != nil { - bitr = tl.itr - bitr.Seek(key) - } - - kvList, dropOp, err := tl.toBackupList(key, bitr) - if err != nil { - return nil, err - } - // we don't want to append a nil value to the slice, so need to check. - if dropOp != nil { - response.DropOperations = append(response.DropOperations, dropOp) - } - return kvList, nil - } - - predMap := make(map[string]struct{}) - for _, pred := range pr.Request.Predicates { - predMap[pred] = struct{}{} - } - stream.ChooseKey = func(item *badger.Item) bool { - parsedKey, err := x.Parse(item.Key()) - if err != nil { - glog.Errorf("error %v while parsing key %v during backup. Skipping...", - err, hex.EncodeToString(item.Key())) - return false - } - - // Do not choose keys that contain parts of a multi-part list. These keys - // will be accessed from the main list. - if parsedKey.HasStartUid { - return false - } - - // Skip backing up the schema and type keys. They will be backed up separately. - if parsedKey.IsSchema() || parsedKey.IsType() { - return false - } - _, ok := predMap[parsedKey.Attr] - return ok - } - - var maxVersion uint64 - stream.Send = func(buf *z.Buffer) error { - list, err := badger.BufferToKVList(buf) - if err != nil { - return err - } - for _, kv := range list.Kv { - if maxVersion < kv.Version { - maxVersion = kv.Version - } - } - return writeKVList(list, cWriter) - } - - // This is where the execution happens. - if err := stream.Orchestrate(ctx); err != nil { - glog.Errorf("While taking backup: %v", err) - return &response, err - } - - // This is used to backup the schema and types. - writePrefix := func(prefix byte) error { - tl := threadLocal{ - alloc: z.NewAllocator(1<<10, "BackupProcessor.WritePrefix"), - } - defer tl.alloc.Release() - - txn := pr.DB.NewTransactionAt(pr.Request.ReadTs, false) - defer txn.Discard() - // We don't need to iterate over all versions. - iopts := badger.DefaultIteratorOptions - iopts.Prefix = []byte{prefix} - - itr := txn.NewIterator(iopts) - defer itr.Close() - - list := &bpb.KVList{} - for itr.Rewind(); itr.Valid(); itr.Next() { - item := itr.Item() - // Don't export deleted items. - if item.IsDeletedOrExpired() { - continue - } - parsedKey, err := x.Parse(item.Key()) - if err != nil { - glog.Errorf("error %v while parsing key %v during backup. Skipping...", - err, hex.EncodeToString(item.Key())) - continue - } - // This check makes sense only for the schema keys. The types are not stored in it. - if _, ok := predMap[parsedKey.Attr]; !parsedKey.IsType() && !ok { - continue - } - kv := y.NewKV(tl.alloc) - if err := item.Value(func(val []byte) error { - kv.Value = append(kv.Value, val...) - return nil - }); err != nil { - return errors.Wrapf(err, "while copying value") - } - - backupKey, err := tl.toBackupKey(item.Key()) - if err != nil { - return err - } - kv.Key = backupKey - kv.UserMeta = tl.alloc.Copy([]byte{item.UserMeta()}) - kv.Version = item.Version() - kv.ExpiresAt = item.ExpiresAt() - list.Kv = append(list.Kv, kv) - } - return writeKVList(list, cWriter) - } - - for _, prefix := range []byte{x.ByteSchema, x.ByteType} { - if err := writePrefix(prefix); err != nil { - glog.Errorf("While writing prefix %d to backup: %v", prefix, err) - return &response, err - } - } - - if maxVersion > pr.Request.ReadTs { - glog.Errorf("Max timestamp seen during backup (%d) is greater than readTs (%d)", - maxVersion, pr.Request.ReadTs) - } - - glog.V(2).Infof("Backup group %d version: %d", pr.Request.GroupId, pr.Request.ReadTs) - if err = cWriter.Close(); err != nil { - glog.Errorf("While closing gzipped writer: %v", err) - return &response, err - } - - if err = w.Close(); err != nil { - glog.Errorf("While closing handler: %v", err) - return &response, err - } - glog.Infof("Backup complete: group %d at %d", pr.Request.GroupId, pr.Request.ReadTs) - return &response, nil -} - -// CompleteBackup will finalize a backup by writing the manifest at the backup destination. -func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) error { - if err := ctx.Err(); err != nil { - return err - } - uri, err := url.Parse(pr.Request.Destination) - if err != nil { - return err - } - handler, err := x.NewUriHandler(uri, GetCredentialsFromRequest(pr.Request)) - if err != nil { - return err - } - - manifest, err := GetManifestNoUpgrade(handler, uri) - if err != nil { - return err - } - manifest.Manifests = append(manifest.Manifests, m) - - if err := CreateManifest(handler, uri, manifest); err != nil { - return errors.Wrap(err, "Complete backup failed") - } - glog.Infof("Backup completed OK.") - return nil -} - -// GoString implements the GoStringer interface for Manifest. -func (m *Manifest) GoString() string { - return fmt.Sprintf(`Manifest{Since: %d, ReadTs: %d, Groups: %v, Encrypted: %v}`, - m.SinceTsDeprecated, m.ReadTs, m.Groups, m.Encrypted) -} - -func (tl *threadLocal) toBackupList(key []byte, itr *badger.Iterator) ( - *bpb.KVList, *pb.DropOperation, error) { - list := &bpb.KVList{} - var dropOp *pb.DropOperation - - item := itr.Item() - if item.Version() < tl.Request.SinceTs { - return list, nil, - errors.Errorf("toBackupList: Item.Version(): %d should be less than sinceTs: %d", - item.Version(), tl.Request.SinceTs) - } - if item.IsDeletedOrExpired() { - return list, nil, nil - } - - switch item.UserMeta() { - case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: - l, err := posting.ReadPostingList(key, itr) - if err != nil { - return nil, nil, errors.Wrapf(err, "while reading posting list") - } - - // Don't allocate kv on tl.alloc, because we don't need it by the end of this func. - kv, err := l.ToBackupPostingList(&tl.bpl, tl.alloc, tl.buf) - if err != nil { - return nil, nil, errors.Wrapf(err, "while rolling up list") - } - - backupKey, err := tl.toBackupKey(kv.Key) - if err != nil { - return nil, nil, err - } - - // check if this key was storing a DROP operation record. If yes, get the drop operation. - dropOp, err = checkAndGetDropOp(key, l, tl.Request.ReadTs) - if err != nil { - return nil, nil, err - } - - kv.Key = backupKey - list.Kv = append(list.Kv, kv) - default: - return nil, nil, errors.Errorf( - "Unexpected meta: %d for key: %s", item.UserMeta(), hex.Dump(key)) - } - return list, dropOp, nil -} - -func (tl *threadLocal) toBackupKey(key []byte) ([]byte, error) { - parsedKey, err := x.Parse(key) - if err != nil { - return nil, errors.Wrapf(err, "could not parse key %s", hex.Dump(key)) - } - bk := parsedKey.ToBackupKey() - - out := tl.alloc.Allocate(bk.Size()) - n, err := bk.MarshalToSizedBuffer(out) - return out[:n], err -} - -func writeKVList(list *bpb.KVList, w io.Writer) error { - if err := binary.Write(w, binary.LittleEndian, uint64(list.Size())); err != nil { - return err - } - buf, err := list.Marshal() - if err != nil { - return err - } - _, err = w.Write(buf) - return err -} - -func checkAndGetDropOp(key []byte, l *posting.List, readTs uint64) (*pb.DropOperation, error) { - isDropOpKey, err := x.IsDropOpKey(key) - if err != nil || !isDropOpKey { - return nil, err - } - - vals, err := l.AllValues(readTs) - if err != nil { - return nil, errors.Wrapf(err, "cannot read value of dgraph.drop.op") - } - switch len(vals) { - case 0: - // do nothing, it means this one was deleted with S * * deletion. - // So, no need to consider it. - return nil, nil - case 1: - val, ok := vals[0].Value.([]byte) - if !ok { - return nil, errors.Errorf("cannot convert value of dgraph.drop.op to byte array, "+ - "got type: %s, value: %v, tid: %v", reflect.TypeOf(vals[0].Value), vals[0].Value, - vals[0].Tid) - } - // A dgraph.drop.op record can have values in only one of the following formats: - // * DROP_ALL; - // * DROP_DATA;ns - // * DROP_ATTR;attrName - // * DROP_NS;ns - // So, accordingly construct the *pb.DropOperation. - dropOp := &pb.DropOperation{} - dropInfo := strings.SplitN(string(val), ";", 2) - if len(dropInfo) != 2 { - return nil, errors.Errorf("Unexpected value: %s for dgraph.drop.op", val) - } - switch dropInfo[0] { - case "DROP_ALL": - dropOp.DropOp = pb.DropOperation_ALL - case "DROP_DATA": - dropOp.DropOp = pb.DropOperation_DATA - dropOp.DropValue = dropInfo[1] // contains namespace. - case "DROP_ATTR": - dropOp.DropOp = pb.DropOperation_ATTR - dropOp.DropValue = dropInfo[1] - case "DROP_NS": - dropOp.DropOp = pb.DropOperation_NS - dropOp.DropValue = dropInfo[1] // contains namespace. - } - return dropOp, nil - default: - // getting more than one values for a non-list predicate is an error - return nil, errors.Errorf("found multiple values for dgraph.drop.op: %v", vals) - } -} diff --git a/worker/backup_manifest.go b/worker/backup_manifest.go index c541aaab8dc..6ae3ccc5122 100644 --- a/worker/backup_manifest.go +++ b/worker/backup_manifest.go @@ -1,13 +1,17 @@ -// +build !oss - /* * Copyright 2021 Dgraph Labs, Inc. and Contributors * - * Licensed under the Dgraph Community License (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * - * https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package worker diff --git a/worker/backup_oss.go b/worker/backup_oss.go deleted file mode 100644 index 1472b902809..00000000000 --- a/worker/backup_oss.go +++ /dev/null @@ -1,46 +0,0 @@ -// +build oss - -/* - * Copyright 2018 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package worker - -import ( - "context" - - "github.com/dgraph-io/dgraph/protos/pb" - "github.com/dgraph-io/dgraph/x" - "github.com/golang/glog" -) - -// Backup implements the Worker interface. -func (w *grpcWorker) Backup( - ctx context.Context, req *pb.BackupRequest) (*pb.BackupResponse, error) { - - glog.Warningf("Backup failed: %v", x.ErrNotSupported) - return nil, x.ErrNotSupported -} - -func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error { - glog.Warningf("Backup failed: %v", x.ErrNotSupported) - return x.ErrNotSupported -} - -func ProcessListBackups(ctx context.Context, location string, creds *x.MinioCredentials) ( - []*Manifest, error) { - - return nil, x.ErrNotSupported -} diff --git a/worker/groups.go b/worker/groups.go index 97ee8cb72a7..7f0ab32ef41 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -1084,11 +1084,6 @@ func GetEEFeaturesList() []string { ee = append(ee, "acl") ee = append(ee, "multi_tenancy") } - if x.WorkerConfig.EncryptionKey != nil { - ee = append(ee, "encryption_at_rest", "encrypted_backup_restore", "encrypted_export") - } else { - ee = append(ee, "backup_restore") - } if x.WorkerConfig.Audit { ee = append(ee, "audit") } diff --git a/worker/online_restore.go b/worker/online_restore.go index e5e0c5b17b3..27e04a37d52 100644 --- a/worker/online_restore.go +++ b/worker/online_restore.go @@ -1,13 +1,17 @@ -// +build !oss - /* - * Copyright 2020 Dgraph Labs, Inc. and Contributors + * Copyright 2021 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * Licensed under the Dgraph Community License (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package worker diff --git a/worker/online_restore_oss.go b/worker/online_restore_oss.go deleted file mode 100644 index ce1752d4ae1..00000000000 --- a/worker/online_restore_oss.go +++ /dev/null @@ -1,43 +0,0 @@ -// +build oss - -/* - * Copyright 2020 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package worker - -import ( - "context" - "sync" - - "github.com/dgraph-io/dgraph/protos/pb" - "github.com/dgraph-io/dgraph/x" - "github.com/golang/glog" -) - -func ProcessRestoreRequest(ctx context.Context, req *pb.RestoreRequest, wg *sync.WaitGroup) error { - glog.Warningf("Restore failed: %v", x.ErrNotSupported) - return x.ErrNotSupported -} - -// Restore implements the Worker interface. -func (w *grpcWorker) Restore(ctx context.Context, req *pb.RestoreRequest) (*pb.Status, error) { - glog.Warningf("Restore failed: %v", x.ErrNotSupported) - return &pb.Status{}, x.ErrNotSupported -} - -func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest, pidx uint64) error { - return nil -} diff --git a/worker/restore_map.go b/worker/restore_map.go index 2bdd0bde6d1..90cf63250c6 100644 --- a/worker/restore_map.go +++ b/worker/restore_map.go @@ -1,13 +1,17 @@ -// +build !oss - /* * Copyright 2021 Dgraph Labs, Inc. and Contributors * - * Licensed under the Dgraph Community License (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * - * https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package worker diff --git a/worker/restore_reduce.go b/worker/restore_reduce.go index db44570c030..80adc80dd05 100644 --- a/worker/restore_reduce.go +++ b/worker/restore_reduce.go @@ -1,13 +1,17 @@ -// +build !oss - /* - * Copyright 2019 Dgraph Labs, Inc. and Contributors + * Copyright 2021 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * Licensed under the Dgraph Community License (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package worker