diff --git a/worker/backup_common.go b/worker/backup_common.go deleted file mode 100755 index c821a988ee0..00000000000 --- a/worker/backup_common.go +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright 2020 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package worker - -import ( - "context" - "math" - "sync" - - "github.com/dgraph-io/badger/v3" - "github.com/pkg/errors" - - "github.com/dgraph-io/dgraph/protos/pb" - "github.com/dgraph-io/dgraph/x" -) - -// predicateSet is a map whose keys are predicates. It is meant to be used as a set. -type predicateSet map[string]struct{} - -// Manifest records backup details, these are values used during restore. -// ReadTs will be used to create the next incremental backup. -// Groups are the IDs of the groups involved. -type Manifest struct { - sync.Mutex - //Type is the type of backup, either full or incremental. - Type string `json:"type"` - // SinceTsDeprecated is kept for backward compatibility. Use readTs instead of sinceTs. - SinceTsDeprecated uint64 `json:"since"` - // ReadTs is the timestamp at which this backup was taken. This would be - // the since timestamp for the next incremental backup. - ReadTs uint64 `json:"read_ts"` - // Groups is the map of valid groups to predicates at the time the backup was created. - Groups map[uint32][]string `json:"groups"` - // BackupId is a unique ID assigned to all the backups in the same series - // (from the first full backup to the last incremental backup). - BackupId string `json:"backup_id"` - // BackupNum is a monotonically increasing number assigned to each backup in - // a series. The full backup as BackupNum equal to one and each incremental - // backup gets assigned the next available number. Used to verify the integrity - // of the data during a restore. - BackupNum uint64 `json:"backup_num"` - // Version specifies the Dgraph version, the backup was taken on. For the backup taken on older - // versions (<= 20.11), the predicates in Group map do not have namespace. Version will be zero - // for older versions. - Version int `json:"version"` - // Path is the name of the backup directory to which this manifest belongs to. - Path string `json:"path"` - // Encrypted indicates whether this backup was encrypted or not. - Encrypted bool `json:"encrypted"` - // DropOperations lists the various DROP operations that took place since the last backup. - // These are used during restore to redo those operations before applying the backup. - DropOperations []*pb.DropOperation `json:"drop_operations"` - // Compression keeps track of the compression that was used for the data. - Compression string `json:"compression"` -} - -// ValidReadTs function returns the valid read timestamp. The backup can have -// the readTs=0 if the backup was done on an older version of dgraph. The -// SinceTsDecprecated is kept for backward compatibility. -func (m *Manifest) ValidReadTs() uint64 { - if m.ReadTs == 0 { - return m.SinceTsDeprecated - } - return m.ReadTs -} - -type MasterManifest struct { - Manifests []*Manifest -} - -func (m *Manifest) getPredsInGroup(gid uint32) predicateSet { - preds, ok := m.Groups[gid] - if !ok { - return nil - } - - predSet := make(predicateSet) - for _, pred := range preds { - if m.Version == 0 { - // For older versions, preds set will contain attribute without namespace. - pred = x.NamespaceAttr(x.GalaxyNamespace, pred) - } - predSet[pred] = struct{}{} - } - return predSet -} - -// GetCredentialsFromRequest extracts the credentials from a backup request. -func GetCredentialsFromRequest(req *pb.BackupRequest) *x.MinioCredentials { - return &x.MinioCredentials{ - AccessKey: req.GetAccessKey(), - SecretKey: req.GetSecretKey(), - SessionToken: req.GetSessionToken(), - Anonymous: req.GetAnonymous(), - } -} - -func StoreExport(request *pb.ExportRequest, dir string, key x.Sensitive) error { - db, err := badger.OpenManaged(badger.DefaultOptions(dir). - WithSyncWrites(false). - WithValueThreshold(1 << 10). - WithNumVersionsToKeep(math.MaxInt32). - WithEncryptionKey(key)) - - if err != nil { - return err - } - - _, err = exportInternal(context.Background(), request, db, true) - // It is important to close the db before sending err to ch. Else, we will see a memory - // leak. - db.Close() - return errors.Wrapf(err, "cannot export data inside DB at %s", dir) -} diff --git a/worker/backup_handler.go b/worker/backup_handler.go deleted file mode 100755 index e65c82fb108..00000000000 --- a/worker/backup_handler.go +++ /dev/null @@ -1,307 +0,0 @@ -//go:build !oss -// +build !oss - -/* - * Copyright 2018 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Dgraph Community License (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt - */ - -package worker - -import ( - "fmt" - "io" - "net/url" - "sort" - - "github.com/dgraph-io/dgraph/protos/pb" - "github.com/dgraph-io/dgraph/x" - - "github.com/pkg/errors" -) - -const ( - // backupPathFmt defines the path to store or index backup objects. - // The expected parameter is a date in string format. - backupPathFmt = `dgraph.%s` - - // backupNameFmt defines the name of backups files or objects (remote). - // The first parameter is the read timestamp at the time of backup. This is used for - // incremental backups and partial restore. - // The second parameter is the group ID when backup happened. This is used for partitioning - // the posting directories 'p' during restore. - backupNameFmt = `r%d-g%d.backup` - - // backupManifest is the name of backup manifests. This a JSON file that contains the - // details of the backup. A backup dir without a manifest is ignored. - // - // Example manifest: - // { - // "since": 2280, - // "groups": [ 1, 2, 3 ], - // } - // - // "since" is the read timestamp used at the backup request. This value is called "since" - // because it used by subsequent incremental backups. - // "groups" are the group IDs that participated. - backupManifest = `manifest.json` - - tmpManifest = `manifest_tmp.json` -) - -// UriHandler interface is implemented by URI scheme handlers. -// When adding new scheme handles, for example 'azure://', an object will implement -// this interface to supply Dgraph with a way to create or load backup files into DB. -// For all methods below, the URL object is parsed as described in `newHandler' and -// the Processor object has the DB, estimated tablets size, and backup parameters. -type UriHandler interface { - // Handlers must know how to Write to their URI location. - // These function calls are used by both Create and Load. - io.WriteCloser - - // BytesWritten returns the number of bytes written. - BytesWritten() int - - // GetManifest returns the master manifest, containing information about all the - // backups. If the backup directory is using old formats (version < 21.03) of manifests, - // then it will return a consolidated master manifest. - GetManifest(*url.URL) (*MasterManifest, error) - - // GetManifests returns the list of manifest for the given backup series ID - // and backup number at the specified location. If backupNum is set to zero, - // all the manifests for the backup series will be returned. If it's greater - // than zero, manifests from one to backupNum will be returned. - GetManifests(*url.URL, string, uint64) ([]*Manifest, error) - - // GetLatestManifest reads the manifests at the given URL and returns the - // latest manifest. - GetLatestManifest(*url.URL) (*Manifest, error) - - // CreateBackupFile prepares the object or file to save the backup file. - CreateBackupFile(*url.URL, *pb.BackupRequest) error - - // CreateManifest creates the given manifest. - CreateManifest(*url.URL, *MasterManifest) error - - // Load will scan location URI for backup files, then load them via loadFn. - // It optionally takes the name of the last directory to consider. Any backup directories - // created after will be ignored. - // Objects implementing this function will be used for retrieving (dowload) backup files - // and loading the data into a DB. The restore CLI command uses this call. - Load(*url.URL, string, uint64, loadFn) LoadResult - - // Verify checks that the specified backup can be restored to a cluster with the - // given groups. The last manifest of that backup should have the same number of - // groups as given list of groups. - Verify(*url.URL, *pb.RestoreRequest, []uint32) error -} - -// NewUriHandler parses the requested URI and finds the corresponding UriHandler. -// If the passed credentials are not nil, they will be used to override the -// default credentials (only for backups to minio or S3). -// Target URI formats: -// [scheme]://[host]/[path]?[args] -// [scheme]:///[path]?[args] -// /[path]?[args] (only for local or NFS) -// -// Target URI parts: -// scheme - service handler, one of: "file", "s3", "minio" -// host - remote address. ex: "dgraph.s3.amazonaws.com" -// path - directory, bucket or container at target. ex: "/dgraph/backups/" -// args - specific arguments that are ok to appear in logs. -// -// Global args (if supported by the handler): -// secure - true|false turn on/off TLS. -// trace - true|false turn on/off HTTP tracing. -// compress - true|false turn on/off data compression. -// encrypt - true|false turn on/off data encryption. -// -// Examples: -// s3://dgraph.s3.amazonaws.com/dgraph/backups?secure=true -// minio://localhost:9000/dgraph?secure=true -// file:///tmp/dgraph/backups -// /tmp/dgraph/backups?compress=gzip -func NewUriHandler(uri *url.URL, creds *x.MinioCredentials) (UriHandler, error) { - switch uri.Scheme { - case "file", "": - return &fileHandler{}, nil - case "minio", "s3": - return NewS3Handler(uri, creds) - } - return nil, errors.Errorf("Unable to handle url: %s", uri) - -} - -// loadFn is a function that will receive the current file being read. -// A reader, the backup groupId, and a map whose keys are the predicates to restore -// are passed as arguments. -type loadFn func(groupId uint32, in *loadBackupInput) (uint64, uint64, error) - -// LoadBackup will scan location l for backup files in the given backup series and load them -// sequentially. Returns the maximum Since value on success, otherwise an error. -func LoadBackup(location, backupId string, backupNum uint64, creds *x.MinioCredentials, - fn loadFn) LoadResult { - uri, err := url.Parse(location) - if err != nil { - return LoadResult{Err: err} - } - - h, err := NewUriHandler(uri, creds) - if err != nil { - return LoadResult{Err: errors.Errorf("Unsupported URI: %v", uri)} - } - - return h.Load(uri, backupId, backupNum, fn) -} - -// VerifyBackup will access the backup location and verify that the specified backup can -// be restored to the cluster. -func VerifyBackup(req *pb.RestoreRequest, creds *x.MinioCredentials, currentGroups []uint32) error { - uri, err := url.Parse(req.GetLocation()) - if err != nil { - return err - } - - h, err := NewUriHandler(uri, creds) - if err != nil { - return errors.Wrap(err, "VerifyBackup") - } - - return h.Verify(uri, req, currentGroups) -} - -// ListBackupManifests scans location l for backup files and returns the list of manifests. -func ListBackupManifests(l string, creds *x.MinioCredentials) ([]*Manifest, error) { - uri, err := url.Parse(l) - if err != nil { - return nil, err - } - - h, err := NewUriHandler(uri, creds) - if err != nil { - return nil, errors.Wrap(err, "ListBackupManifests") - } - - m, err := h.GetManifest(uri) - if err != nil { - return nil, err - } - return m.Manifests, nil -} - -// filterManifests takes a list of manifests and returns the list of manifests -// that should be considered during a restore. -func filterManifests(manifests []*Manifest, backupId string) ([]*Manifest, error) { - // Go through the files in reverse order and stop when the latest full backup is found. - var filteredManifests []*Manifest - for i := len(manifests) - 1; i >= 0; i-- { - // If backupId is not empty, skip all the manifests that do not match the given - // backupId. If it's empty, do not skip any manifests as the default behavior is - // to restore the latest series of backups. - if len(backupId) > 0 && manifests[i].BackupId != backupId { - continue - } - - filteredManifests = append(filteredManifests, manifests[i]) - if manifests[i].Type == "full" { - break - } - } - - // Reverse the filtered lists since the original iteration happened in reverse. - for i := len(filteredManifests)/2 - 1; i >= 0; i-- { - opp := len(filteredManifests) - 1 - i - filteredManifests[i], filteredManifests[opp] = filteredManifests[opp], filteredManifests[i] - } - - if err := verifyManifests(filteredManifests); err != nil { - return nil, err - } - - return filteredManifests, nil -} - -func verifyManifests(manifests []*Manifest) error { - if len(manifests) == 0 { - return nil - } - - if manifests[0].BackupNum != 1 { - return errors.Errorf("expected a BackupNum value of 1 for first manifest but got %d", - manifests[0].BackupNum) - } - - backupId := manifests[0].BackupId - var backupNum uint64 - for _, manifest := range manifests { - if manifest.BackupId != backupId { - return errors.Errorf("found a manifest with backup ID %s but expected %s", - manifest.BackupId, backupId) - } - - backupNum++ - if manifest.BackupNum != backupNum { - return errors.Errorf("found a manifest with backup number %d but expected %d", - manifest.BackupNum, backupNum) - } - } - - return nil -} - -func backupName(since uint64, groupId uint32) string { - return fmt.Sprintf(backupNameFmt, since, groupId) -} - -// verifyRequest verifies the manifests satisfy the requirements to process the given -// restore request. -func verifyRequest(req *pb.RestoreRequest, manifests []*Manifest, currentGroups []uint32) error { - if len(manifests) == 0 { - return errors.Errorf("No backups with the specified backup ID %s", req.GetBackupId()) - } - - if err := verifyManifests(manifests); err != nil { - return err - } - - lastManifest := manifests[len(manifests)-1] - if len(currentGroups) != len(lastManifest.Groups) { - return errors.Errorf("groups in cluster and latest backup manifest differ") - } - - for _, group := range currentGroups { - if _, ok := lastManifest.Groups[group]; !ok { - return errors.Errorf("groups in cluster and latest backup manifest differ") - } - } - return nil -} - -func getManifests(manifests []*Manifest, backupId string, - backupNum uint64) ([]*Manifest, error) { - - manifests, err := filterManifests(manifests, backupId) - if err != nil { - return nil, err - } - - // Sort manifests in the ascending order of their BackupNum so that the first - // manifest corresponds to the first full backup and so on. - sort.Slice(manifests, func(i, j int) bool { - return manifests[i].BackupNum < manifests[j].BackupNum - }) - - if backupNum > 0 { - if len(manifests) < int(backupNum) { - return nil, errors.Errorf("not enough backups to restore manifest with backupNum %d", - backupNum) - } - manifests = manifests[:backupNum] - } - return manifests, nil -} diff --git a/worker/backup_handler_test.go b/worker/backup_handler_test.go deleted file mode 100755 index 5984693047e..00000000000 --- a/worker/backup_handler_test.go +++ /dev/null @@ -1,124 +0,0 @@ -//go:build !oss -// +build !oss - -/* - * Copyright 2018 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Dgraph Community License (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt - */ - -package worker - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestFilterManifestDefault(t *testing.T) { - manifests := []*Manifest{ - { - Type: "full", - BackupId: "aa", - BackupNum: 1, - }, - { - Type: "full", - BackupId: "ab", - BackupNum: 1, - }, - } - expected := []*Manifest{ - { - Type: "full", - BackupId: "ab", - BackupNum: 1, - }, - } - manifests, err := filterManifests(manifests, "") - require.NoError(t, err) - require.Equal(t, manifests, expected) -} - -func TestFilterManifestSelectSeries(t *testing.T) { - manifests := []*Manifest{ - { - Type: "full", - BackupId: "aa", - BackupNum: 1, - }, - { - Type: "full", - BackupId: "ab", - BackupNum: 1, - }, - } - expected := []*Manifest{ - { - Type: "full", - BackupId: "aa", - BackupNum: 1, - }, - } - manifests, err := filterManifests(manifests, "aa") - require.NoError(t, err) - require.Equal(t, manifests, expected) -} - -func TestFilterManifestMissingBackup(t *testing.T) { - manifests := []*Manifest{ - { - Type: "full", - BackupId: "aa", - BackupNum: 1, - }, - { - Type: "incremental", - BackupId: "aa", - BackupNum: 3, - }, - } - _, err := filterManifests(manifests, "aa") - require.Error(t, err) - require.Contains(t, err.Error(), "found a manifest with backup number") -} - -func TestFilterManifestMissingFirstBackup(t *testing.T) { - manifests := []*Manifest{ - { - Type: "incremental", - BackupId: "aa", - BackupNum: 2, - }, - { - Type: "incremental", - BackupId: "aa", - BackupNum: 3, - }, - } - _, err := filterManifests(manifests, "aa") - require.Error(t, err) - require.Contains(t, err.Error(), "expected a BackupNum value of 1 for first manifest") -} - -func TestFilterManifestDifferentSeries(t *testing.T) { - manifests := []*Manifest{ - { - Type: "full", - BackupId: "aa", - BackupNum: 1, - }, - { - Type: "incremental", - BackupId: "ab", - BackupNum: 2, - }, - } - _, err := filterManifests(manifests, "") - require.Error(t, err) - require.Contains(t, err.Error(), "found a manifest with backup ID") -} diff --git a/worker/restore.go b/worker/restore.go deleted file mode 100755 index b55574d64dc..00000000000 --- a/worker/restore.go +++ /dev/null @@ -1,317 +0,0 @@ -//go:build !oss -// +build !oss - -/* - * Copyright 2019 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Dgraph Community License (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt - */ - -package worker - -import ( - "bufio" - "compress/gzip" - "encoding/binary" - "encoding/hex" - "fmt" - "io" - "math" - "os" - "path/filepath" - "strconv" - - "github.com/dgraph-io/badger/v3" - "github.com/dgraph-io/badger/v3/options" - bpb "github.com/dgraph-io/badger/v3/pb" - "github.com/golang/glog" - "github.com/golang/snappy" - "github.com/pkg/errors" - - "github.com/dgraph-io/dgraph/codec" - "github.com/dgraph-io/dgraph/ee/enc" - "github.com/dgraph-io/dgraph/posting" - "github.com/dgraph-io/dgraph/protos/pb" - "github.com/dgraph-io/dgraph/x" -) - -// RunRestore calls badger.Load and tries to load data into a new DB. -func RunRestore(pdir, location, backupId string, key x.Sensitive, - ctype options.CompressionType, clevel int) LoadResult { - // Create the pdir if it doesn't exist. - if err := os.MkdirAll(pdir, 0700); err != nil { - return LoadResult{Err: err} - } - - // Scan location for backup files and load them. Each file represents a node group, - // and we create a new p dir for each. - return LoadBackup(location, backupId, 0, nil, - func(groupId uint32, in *loadBackupInput) (uint64, uint64, error) { - bReader, err := in.getReader(key) - if err != nil { - return 0, 0, errors.Wrap(err, "failed to get reader for restore") - } - dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId)) - if !pathExist(dir) { - fmt.Println("Creating new db:", dir) - } - // The badger DB should be opened only after creating the backup - // file reader and verifying the encryption in the backup file. - db, err := badger.OpenManaged(badger.DefaultOptions(dir). - WithCompression(ctype). - WithZSTDCompressionLevel(clevel). - WithSyncWrites(false). - WithBlockCacheSize(100 * (1 << 20)). - WithIndexCacheSize(100 * (1 << 20)). - WithNumVersionsToKeep(math.MaxInt32). - WithEncryptionKey(key). - WithNamespaceOffset(x.NamespaceOffset)) - if err != nil { - return 0, 0, err - } - defer db.Close() - maxUid, maxNsId, err := loadFromBackup(db, &loadBackupInput{ - r: bReader, - restoreTs: 0, - preds: in.preds, - dropOperations: in.dropOperations, - isOld: in.isOld, - compression: in.compression, - }) - if err != nil { - return 0, 0, errors.Wrap(err, "loadFromBackup failed") - } - return maxUid, maxNsId, x.WriteGroupIdFile(dir, uint32(groupId)) - }) -} - -type loadBackupInput struct { - r io.Reader - restoreTs uint64 - preds predicateSet - dropOperations []*pb.DropOperation - isOld bool - compression string -} - -func (l *loadBackupInput) getReader(key x.Sensitive) (io.Reader, error) { - r, err := enc.GetReader(key, l.r) - if err != nil { - return nil, err - } - switch l.compression { - case "": - gzReader, err := gzip.NewReader(r) - if err != nil && len(key) != 0 { - err = errors.Wrap(err, - "Unable to read the backup. Ensure the encryption key is correct.") - } - return gzReader, err - case "snappy": - // Snappy doesn't return an error. If the data is encrypted, we will - // get an error while reading it. - return snappy.NewReader(r), nil - default: - return nil, errors.Errorf("Invalid compression in backup %q", l.compression) - } -} - -// loadFromBackup reads the backup, converts the keys and values to the required format, -// and loads them to the given badger DB. The set of predicates is used to avoid restoring -// values from predicates no longer assigned to this group. -// If restoreTs is greater than zero, the key-value pairs will be written with that timestamp. -// Otherwise, the original value is used. -// TODO(DGRAPH-1234): Check whether restoreTs can be removed. -func loadFromBackup(db *badger.DB, in *loadBackupInput) (uint64, uint64, error) { - br := bufio.NewReaderSize(in.r, 16<<10) - unmarshalBuf := make([]byte, 1<<10) - - // if there were any DROP operations that need to be applied before loading the backup into - // the db, then apply them here - if err := applyDropOperationsBeforeRestore(db, in.dropOperations, in.isOld); err != nil { - return 0, 0, errors.Wrapf(err, "cannot apply DROP operations while loading backup") - } - - // Delete schemas and types. Each backup file should have a complete copy of the schema. - if err := db.DropPrefix([]byte{x.ByteSchema}); err != nil { - return 0, 0, err - } - if err := db.DropPrefix([]byte{x.ByteType}); err != nil { - return 0, 0, err - } - - loader := db.NewKVLoader(16) - var maxUid, maxNsId uint64 - for { - var sz uint64 - err := binary.Read(br, binary.LittleEndian, &sz) - if err == io.EOF { - break - } else if err != nil { - return 0, 0, errors.Wrap(err, "read failed") - } - - if cap(unmarshalBuf) < int(sz) { - unmarshalBuf = make([]byte, sz) - } - - if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil { - return 0, 0, err - } - - list := &bpb.KVList{} - if err := list.Unmarshal(unmarshalBuf[:sz]); err != nil { - return 0, 0, err - } - - for _, kv := range list.Kv { - if len(kv.GetUserMeta()) != 1 { - return 0, 0, errors.Errorf( - "Unexpected meta: %v for key: %s", kv.UserMeta, hex.Dump(kv.Key)) - } - - restoreKey, namespace, err := fromBackupKey(kv.Key) - if err != nil { - return 0, 0, err - } - - // Filter keys using the preds set. Do not do this filtering for type keys - // as they are meant to be in every group and their Attr value does not - // match a predicate name. - parsedKey, err := x.Parse(restoreKey) - if err != nil { - return 0, 0, errors.Wrapf(err, "could not parse key %s", hex.Dump(restoreKey)) - } - if _, ok := in.preds[parsedKey.Attr]; !parsedKey.IsType() && !ok { - continue - } - - // Update the max uid and namespace id that has been seen while restoring this backup. - maxUid = x.Max(maxUid, parsedKey.Uid) - maxNsId = x.Max(maxNsId, namespace) - - // Override the version if requested. Should not be done for type and schema predicates, - // which always have their version set to 1. - if in.restoreTs > 0 && !parsedKey.IsSchema() && !parsedKey.IsType() { - kv.Version = in.restoreTs - } - - switch kv.GetUserMeta()[0] { - case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: - backupPl := &pb.BackupPostingList{} - if err := backupPl.Unmarshal(kv.Value); err != nil { - return 0, 0, errors.Wrapf(err, "while reading backup posting list") - } - pl := posting.FromBackupPostingList(backupPl) - shouldSplit := pl.Size() >= (1<<20)/2 && len(pl.Pack.Blocks) > 1 - - if !shouldSplit || parsedKey.HasStartUid || len(pl.GetSplits()) > 0 { - // This covers two cases. - // 1. The list is not big enough to be split. - // 2. This key is storing part of a multi-part list. Write each individual - // part without rolling the key first. This part is here for backwards - // compatibility. New backups are not affected because there was a change - // to roll up lists into a single one. - newKv := posting.MarshalPostingList(pl, nil) - codec.FreePack(pl.Pack) - newKv.Key = restoreKey - // Use the version of the KV before we marshalled the - // posting list. The MarshalPostingList function returns KV - // with a zero version. - newKv.Version = kv.Version - if err := loader.Set(newKv); err != nil { - return 0, 0, err - } - } else { - // This is a complete list. It should be rolled up to avoid writing - // a list that is too big to be read back from disk. - // Rollup will take ownership of the Pack and will free the memory. - l := posting.NewList(restoreKey, pl, kv.Version) - kvs, err := l.Rollup(nil) - if err != nil { - // TODO: wrap errors in this file for easier debugging. - return 0, 0, err - } - for _, kv := range kvs { - if err := loader.Set(kv); err != nil { - return 0, 0, err - } - } - } - - case posting.BitSchemaPosting: - appendNamespace := func() error { - // If the backup was taken on old version, we need to append the namespace to - // the fields of TypeUpdate. - var update pb.TypeUpdate - if err := update.Unmarshal(kv.Value); err != nil { - return err - } - update.TypeName = x.GalaxyAttr(update.TypeName) - for _, sch := range update.Fields { - sch.Predicate = x.GalaxyAttr(sch.Predicate) - } - kv.Value, err = update.Marshal() - return err - } - if in.isOld && parsedKey.IsType() { - if err := appendNamespace(); err != nil { - glog.Errorf("Unable to (un)marshal type: %+v. Err=%v\n", parsedKey, err) - continue - } - } - // Schema and type keys are not stored in an intermediate format so their - // value can be written as is. - kv.Key = restoreKey - if err := loader.Set(kv); err != nil { - return 0, 0, err - } - - default: - return 0, 0, errors.Errorf( - "Unexpected meta %d for key %s", kv.UserMeta[0], hex.Dump(kv.Key)) - } - } - } - - if err := loader.Finish(); err != nil { - return 0, 0, err - } - - return maxUid, maxNsId, nil -} - -func applyDropOperationsBeforeRestore( - db *badger.DB, dropOperations []*pb.DropOperation, isOld bool) error { - for _, operation := range dropOperations { - switch operation.DropOp { - case pb.DropOperation_ALL: - return db.DropAll() - case pb.DropOperation_DATA: - return db.DropPrefix([]byte{x.DefaultPrefix}) - case pb.DropOperation_ATTR: - attr := operation.DropValue - if isOld { - attr = x.GalaxyAttr(operation.DropValue) - } - return db.DropPrefix(x.PredicatePrefix(attr)) - case pb.DropOperation_NS: - ns, err := strconv.ParseUint(operation.DropValue, 0, 64) - x.Check(err) - return db.BanNamespace(ns) - } - } - return nil -} - -func fromBackupKey(key []byte) ([]byte, uint64, error) { - backupKey := &pb.BackupKey{} - if err := backupKey.Unmarshal(key); err != nil { - return nil, 0, errors.Wrapf(err, "while reading backup key %s", hex.Dump(key)) - } - return x.FromBackupKey(backupKey), backupKey.Namespace, nil -}