Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update the upgrade tool for CORS (depreciated predicates) change #7486

Merged
merged 23 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5d6ab46
s3 handler for export backup
aman-bansal Feb 18, 2021
bd364b8
fixing leaks and bugs with export
aman-bansal Feb 18, 2021
18a62c5
fixing s3 restore
aman-bansal Feb 19, 2021
1b5cb7f
fixing export input option
aman-bansal Feb 19, 2021
92528db
adding cors to the schema
aman-bansal Feb 19, 2021
34e5277
fixing cors export
aman-bansal Feb 22, 2021
eb158cf
finalizing contract and everything
aman-bansal Feb 22, 2021
e7a0eaa
removing redundant
aman-bansal Feb 22, 2021
1b69747
adding comments to simplify
aman-bansal Feb 22, 2021
ff7bf34
refactoring export code a bit
aman-bansal Feb 23, 2021
894771d
making file handler, s3 handler refactor + export backup to work cons…
aman-bansal Feb 23, 2021
5bef5ab
fix the cors issue in backup backward compatibility
NamanJain8 Feb 25, 2021
08cfa22
address comments
NamanJain8 Mar 2, 2021
ea0e9cd
Merge branch 'master' into naman/export-backup
NamanJain8 Mar 2, 2021
2a7ed62
Merge branch 'master' into naman/export-backup
NamanJain8 Mar 3, 2021
5e3d2e5
basic working
NamanJain8 Mar 4, 2021
1a1827e
add support for TLS and slash in upgrade tool
NamanJain8 Mar 10, 2021
95d964d
fix export_backup tool for fixing cors
NamanJain8 Mar 14, 2021
29bc90d
Merge branch 'master' into naman/export-backup
NamanJain8 Mar 15, 2021
a909a27
clean up
NamanJain8 Mar 15, 2021
72b1887
Merge branch 'naman/export-backup' of github.com:dgraph-io/dgraph int…
NamanJain8 Mar 15, 2021
f92d558
add persistent query fix
NamanJain8 Mar 16, 2021
813d0aa
address comments
NamanJain8 Mar 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 67 additions & 3 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/dgraph-io/badger/v3/options"

"google.golang.org/grpc/credentials"

"github.com/dgraph-io/dgraph/ee/enc"
Expand Down Expand Up @@ -320,7 +326,8 @@ func initExportBackup() {

flag := ExportBackup.Cmd.Flags()
flag.StringVarP(&opt.location, "location", "l", "",
"Sets the location of the backup. Only file URIs are supported for now.")
`Sets the location of the backup. Both file URIs and s3 are supported.
This command will take care of all the full + incremental backups present in the location.`)
flag.StringVarP(&opt.destination, "destination", "d", "",
"The folder to which export the backups.")
flag.StringVarP(&opt.format, "format", "f", "rdf",
Expand All @@ -334,6 +341,63 @@ func runExportBackup() error {
return err
}

exporter := worker.BackupExporter{}
return exporter.ExportBackup(opt.location, opt.destination, opt.format, opt.key)
if opt.format != "json" && opt.format != "rdf" {
return errors.Errorf("invalid format %s", opt.format)
}
// Create exportDir and temporary folder to store the restored backup.
exportDir, err := filepath.Abs(opt.destination)
if err != nil {
return errors.Wrapf(err, "cannot convert path %s to absolute path", exportDir)
}
if err := os.MkdirAll(exportDir, 0755); err != nil {
return errors.Wrapf(err, "cannot create dir %s", exportDir)
}
tmpDir, err := ioutil.TempDir("", "export_backup")
if err != nil {
return errors.Wrapf(err, "cannot create temp dir")
}

restore := worker.RunRestore(tmpDir, opt.location, "", opt.key, options.None, 0)
if restore.Err != nil {
return restore.Err
}

files, err := ioutil.ReadDir(tmpDir)
if err != nil {
return err
}
// Export the data from the p directories produced by the last step.
ch := make(chan error, len(files))
for _, f := range files {
if !f.IsDir() {
continue
}

dir := filepath.Join(filepath.Join(tmpDir, f.Name()))
gid, err := strconv.ParseUint(strings.TrimPrefix(f.Name(), "p"), 32, 10)
if err != nil {
ch <- errors.Wrapf(err, "cannot export data inside DB at %s", dir)
}
go worker.StoreExport(&pb.ExportRequest{
GroupId: uint32(gid),
ReadTs: restore.Version,
UnixTs: time.Now().Unix(),
Format: opt.format,
Destination: exportDir,
}, dir, opt.key, ch)
}

for i := 0; i < len(files); i++ {
err := <-ch
if err != nil {
return err
}
}

// Clean up temporary directory.
if err := os.RemoveAll(tmpDir); err != nil {
return errors.Wrapf(err, "cannot remove temp directory at %s", tmpDir)
}

return nil
}
27 changes: 27 additions & 0 deletions worker/backup_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
package worker

import (
"context"
"math"
"sync"

"github.com/dgraph-io/badger/v3"
"github.com/pkg/errors"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
)
Expand Down Expand Up @@ -95,3 +100,25 @@ func GetCredentialsFromRequest(req *pb.BackupRequest) *x.MinioCredentials {
Anonymous: req.GetAnonymous(),
}
}

func StoreExport(request *pb.ExportRequest, dir string,
key x.SensitiveByteSlice, ch chan error) {

db, err := badger.OpenManaged(badger.DefaultOptions(dir).
WithSyncWrites(false).
WithValueThreshold(1 << 10).
WithNumVersionsToKeep(math.MaxInt32).
WithEncryptionKey(key))

if err != nil {
ch <- errors.Wrapf(err, "cannot open DB at %s", dir)
return
}

_, err = exportInternal(context.Background(), request, db, true)
// It is important to close the db before sending err to ch. Else, we will see a memory
// leak.
db.Close()
ch <- errors.Wrapf(err, "cannot export data inside DB at %s", dir)
return
}
9 changes: 1 addition & 8 deletions worker/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,14 +679,6 @@ func exportInternal(ctx context.Context, in *pb.ExportRequest, db *badger.DB,
}
return listWrap(kv), nil

// below predicates no longer exist internally starting v21.03 but leaving them here
// so that users with a binary with version >= 21.03 can export data from a version < 21.03
// without this internal data showing up.
case e.attr == "dgraph.cors":
case e.attr == "dgraph.graphql.schema_created_at":
case e.attr == "dgraph.graphql.schema_history":
// Ignore these predicates.

case pk.IsData():
e.pl, err = posting.ReadPostingList(key, itr)
if err != nil {
Expand Down Expand Up @@ -863,6 +855,7 @@ func exportInternal(ctx context.Context, in *pb.ExportRequest, db *badger.DB,
if err := writePrefix(x.ByteType); err != nil {
return nil, err
}

glog.Infof("Export DONE for group %d at timestamp %d.", in.GroupId, in.ReadTs)
return exportStorage.finishWriting(dataWriter, schemaWriter, gqlSchemaWriter)
}
Expand Down
149 changes: 0 additions & 149 deletions worker/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,15 @@
package worker

import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"time"

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"

Expand All @@ -41,12 +34,6 @@ type fileHandler struct {
fp *os.File
}

// BackupExporter is an alias of fileHandler so that this struct can be used
// by the export_backup command.
type BackupExporter struct {
fileHandler
}

// readManifest reads a manifest file at path using the handler.
// Returns nil on success, otherwise an error.
func (h *fileHandler) readManifest(path string, m *Manifest) error {
Expand Down Expand Up @@ -265,139 +252,3 @@ func pathExist(path string) bool {
}
return !os.IsNotExist(err) && !os.IsPermission(err)
}

func (h *fileHandler) ExportBackup(backupDir, exportDir, format string,
key x.SensitiveByteSlice) error {
if format != "json" && format != "rdf" {
return errors.Errorf("invalid format %s", format)
}

// Create exportDir and temporary folder to store the restored backup.
var err error
exportDir, err = filepath.Abs(exportDir)
if err != nil {
return errors.Wrapf(err, "cannot convert path %s to absolute path", exportDir)
}
if err := os.MkdirAll(exportDir, 0755); err != nil {
return errors.Wrapf(err, "cannot create dir %s", exportDir)
}
tmpDir, err := ioutil.TempDir("", "export_backup")
if err != nil {
return errors.Wrapf(err, "cannot create temp dir")
}

// Function to load the a single backup file.
loadFn := func(r io.Reader, groupId uint32, preds predicateSet, isOld bool) (uint64, error) {
dir := filepath.Join(tmpDir, fmt.Sprintf("p%d", groupId))

r, err := enc.GetReader(key, r)
if err != nil {
return 0, err
}

gzReader, err := gzip.NewReader(r)
if err != nil {
if len(key) != 0 {
err = errors.Wrap(err,
"Unable to read the backup. Ensure the encryption key is correct.")
}
return 0, errors.Wrapf(err, "cannot create gzip reader")
}
// The badger DB should be opened only after creating the backup
// file reader and verifying the encryption in the backup file.
db, err := badger.OpenManaged(badger.DefaultOptions(dir).
WithSyncWrites(false).
WithNumVersionsToKeep(math.MaxInt32).
WithEncryptionKey(key).
WithNamespaceOffset(x.NamespaceOffset))

if err != nil {
return 0, errors.Wrapf(err, "cannot open DB at %s", dir)
}
defer db.Close()
_, _, err = loadFromBackup(db, &loadBackupInput{
// TODO(Naman): Why is drop operations nil here?
r: gzReader, restoreTs: 0, preds: preds, dropOperations: nil, isOld: isOld,
})
if err != nil {
return 0, errors.Wrapf(err, "cannot load backup")
}
return 0, x.WriteGroupIdFile(dir, uint32(groupId))
}

// Read manifest from folder.
manifest := &Manifest{}
manifestPath := filepath.Join(backupDir, backupManifest)
if err := h.ReadManifest(manifestPath, manifest); err != nil {
return errors.Wrapf(err, "cannot read manifest at %s", manifestPath)
}
manifest.Path = manifestPath
if manifest.Since == 0 || len(manifest.Groups) == 0 {
return errors.Errorf("no data found in backup")
}

// Restore backup to disk.
for gid := range manifest.Groups {
file := filepath.Join(backupDir, backupName(manifest.Since, gid))
fp, err := os.Open(file)
if err != nil {
return errors.Wrapf(err, "cannot open backup file at %s", file)
}
defer fp.Close()

// Only restore the predicates that were assigned to this group at the time
// of the last backup.
predSet := manifest.getPredsInGroup(gid)

_, err = loadFn(fp, gid, predSet, manifest.Version == 0)
if err != nil {
return err
}
}

// Export the data from the p directories produced by the last step.
ch := make(chan error, len(manifest.Groups))
for gid := range manifest.Groups {
go func(group uint32) {
dir := filepath.Join(tmpDir, fmt.Sprintf("p%d", group))
db, err := badger.OpenManaged(badger.DefaultOptions(dir).
WithSyncWrites(false).
WithNumVersionsToKeep(math.MaxInt32).
WithEncryptionKey(key))

if err != nil {
ch <- errors.Wrapf(err, "cannot open DB at %s", dir)
return
}

req := &pb.ExportRequest{
GroupId: group,
ReadTs: manifest.Since,
UnixTs: time.Now().Unix(),
Format: format,
Namespace: math.MaxUint64, // Export all the namespaces.
Destination: exportDir,
}

_, err = exportInternal(context.Background(), req, db, true)
// It is important to close the db before sending err to ch. Else, we will see a memory
// leak.
db.Close()
ch <- errors.Wrapf(err, "cannot export data inside DB at %s", dir)
}(gid)
}

for i := 0; i < len(manifest.Groups); i++ {
err := <-ch
if err != nil {
return err
}
}

// Clean up temporary directory.
if err := os.RemoveAll(tmpDir); err != nil {
return errors.Wrapf(err, "cannot remove temp directory at %s", tmpDir)
}

return nil
}
12 changes: 11 additions & 1 deletion worker/online_restore_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error {
for _, pred := range preds {
// Force the tablet to be moved to this group, even if it's currently being served
// by another group.
// TODO(Naman): The depreciated predicates removed in fixBackup() will still have tablet
// Add checks to not those tablet here.
if tablet, err := groups().ForceTablet(pred); err != nil {
return errors.Wrapf(err, "cannot create tablet for restored predicate %s", pred)
} else if tablet.GetGroupId() != req.GroupId {
Expand Down Expand Up @@ -321,6 +323,7 @@ func getCredentialsFromRestoreRequest(req *pb.RestoreRequest) *x.MinioCredential
}

func writeBackup(ctx context.Context, req *pb.RestoreRequest) error {
var isOld bool
res := LoadBackup(req.Location, req.BackupId, req.BackupNum,
getCredentialsFromRestoreRequest(req),
func(groupId uint32, in *loadBackupInput) (uint64, uint64, error) {
Expand All @@ -347,8 +350,12 @@ func writeBackup(ctx context.Context, req *pb.RestoreRequest) error {
return 0, 0, errors.Wrapf(err, "couldn't create gzip reader")
}

isOld = in.isOld
maxUid, maxNsId, err := loadFromBackup(pstore, &loadBackupInput{
r: gzReader, restoreTs: req.RestoreTs, preds: in.preds, dropOperations: in.dropOperations,
r: gzReader,
restoreTs: req.RestoreTs,
preds: in.preds,
dropOperations: in.dropOperations,
})
if err != nil {
return 0, 0, errors.Wrapf(err, "cannot write backup")
Expand Down Expand Up @@ -389,5 +396,8 @@ func writeBackup(ctx context.Context, req *pb.RestoreRequest) error {
if res.Err != nil {
return errors.Wrapf(res.Err, "cannot write backup")
}
if isOld && groups().ServesGroup(1) {
return fixBackup()
}
return nil
}
Loading