From 2b823e4611e2c2ae4ad1bfd375d716f4e8b41527 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Thu, 5 Dec 2019 14:04:42 -0800 Subject: [PATCH] Write group_id files to postings directories during restore. This PR changes the restore command to write a group_id file containing the ID of the group to which the data belongs. This is the same mechanism used by the bulk loader to ensure alphas with data for reserved predicates are always assigned to the right group. --- dgraph/cmd/bulk/run.go | 8 +--- ee/backup/restore.go | 6 ++- ee/backup/tests/filesystem/backup_test.go | 8 ++++ ee/backup/tests/minio/backup_test.go | 8 ++++ worker/server_state.go | 23 ++-------- x/file.go | 51 +++++++++++++++++++++++ x/x.go | 5 +++ 7 files changed, 82 insertions(+), 27 deletions(-) diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index a43428e23b6..4845ac81768 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -38,7 +38,6 @@ import ( var Bulk x.SubCommand var defaultOutDir = "./out" -var groupFile = "group_id" func init() { Bulk.Cmd = &cobra.Command{ @@ -194,12 +193,7 @@ func run() { x.Check(os.MkdirAll(dir, 0700)) opt.shardOutputDirs = append(opt.shardOutputDirs, dir) - groupFile := filepath.Join(dir, groupFile) - f, err := os.OpenFile(groupFile, os.O_CREATE|os.O_WRONLY, 0600) - x.Check(err) - x.Check2(f.WriteString(strconv.Itoa(i + 1))) - x.Check2(f.WriteString("\n")) - x.Check(f.Close()) + x.Check(x.WriteGroupIdFile(dir, uint32(i+1))) } // Create a directory just for bulk loader's usage. diff --git a/ee/backup/restore.go b/ee/backup/restore.go index d2fea6de885..a97c7bc2138 100644 --- a/ee/backup/restore.go +++ b/ee/backup/restore.go @@ -55,7 +55,11 @@ func RunRestore(pdir, location, backupId string) (uint64, error) { if err != nil { return nil } - return loadFromBackup(db, gzReader, preds) + if err := loadFromBackup(db, gzReader, preds); err != nil { + return err + } + + return x.WriteGroupIdFile(dir, uint32(groupId)) }) } diff --git a/ee/backup/tests/filesystem/backup_test.go b/ee/backup/tests/filesystem/backup_test.go index 1cc1375410a..ce312d16036 100644 --- a/ee/backup/tests/filesystem/backup_test.go +++ b/ee/backup/tests/filesystem/backup_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/url" "os" + "path/filepath" "strings" "testing" "time" @@ -249,6 +250,13 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m _, err := backup.RunRestore("./data/restore", backupLocation, lastDir) require.NoError(t, err) + for i, pdir := range []string{"p1", "p2", "p3"} { + pdir = filepath.Join("./data/restore", pdir) + groupId, err := x.ReadGroupIdFile(pdir) + require.NoError(t, err) + require.Equal(t, uint32(i+1), groupId) + } + restored, err := testutil.GetPValues("./data/restore/p1", "movie", commitTs) require.NoError(t, err) t.Logf("--- Restored values: %+v\n", restored) diff --git a/ee/backup/tests/minio/backup_test.go b/ee/backup/tests/minio/backup_test.go index e486113aab5..59b18d50608 100644 --- a/ee/backup/tests/minio/backup_test.go +++ b/ee/backup/tests/minio/backup_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/url" "os" + "path/filepath" "strings" "testing" "time" @@ -258,6 +259,13 @@ func runRestore(t *testing.T, lastDir string, commitTs uint64) map[string]string err = testutil.ExecWithOpts(argv, testutil.CmdOpts{Dir: cwd}) require.NoError(t, err) + for i, pdir := range []string{"p1", "p2", "p3"} { + pdir = filepath.Join("./data/restore", pdir) + groupId, err := x.ReadGroupIdFile(pdir) + require.NoError(t, err) + require.Equal(t, uint32(i+1), groupId) + } + restored, err := testutil.GetPValues("./data/restore/p1", "movie", commitTs) require.NoError(t, err) t.Logf("--- Restored values: %+v\n", restored) diff --git a/worker/server_state.go b/worker/server_state.go index d1a6be9a52f..0d1a67fbab6 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -17,12 +17,8 @@ package worker import ( - "io/ioutil" "math" "os" - "path/filepath" - "strconv" - "strings" "time" "github.com/dgraph-io/badger/v2" @@ -33,10 +29,6 @@ import ( "golang.org/x/net/context" ) -const ( - groupFile = "group_id" -) - // ServerState holds the state of the Dgraph server. type ServerState struct { FinishCh chan struct{} // channel to wait for all pending reqs to finish. @@ -65,19 +57,12 @@ func InitServerState() { State.initStorage() go State.fillTimestampRequests() - contents, err := ioutil.ReadFile(filepath.Join(Config.PostingDir, groupFile)) - if err != nil { - return - } - - glog.Infof("Found group_id file inside posting directory %s. Will attempt to read.", - Config.PostingDir) - groupId, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 0, 32) + groupId, err := x.ReadGroupIdFile(Config.PostingDir) if err != nil { - glog.Warningf("Could not read %s file inside posting directory %s.", - groupFile, Config.PostingDir) + glog.Warningf("Could not read %s file inside posting directory %s.", x.GroupIdFileName, + Config.PostingDir) } - x.WorkerConfig.ProposedGroupId = uint32(groupId) + x.WorkerConfig.ProposedGroupId = groupId } func (s *ServerState) runVlogGC(store *badger.DB) { diff --git a/x/file.go b/x/file.go index 25d56b22179..a786247f2f6 100644 --- a/x/file.go +++ b/x/file.go @@ -18,8 +18,10 @@ package x import ( "io" + "io/ioutil" "os" "path/filepath" + "strconv" "strings" "github.com/golang/glog" @@ -149,3 +151,52 @@ func IsMissingOrEmptyDir(path string) (err error) { err = ErrMissingDir return } + +// WriteGroupIdFile writes the given group ID to the group_id file inside the given +// postings directory. +func WriteGroupIdFile(pdir string, group_id uint32) error { + if group_id == 0 { + return errors.Errorf("ID written to group_id file must be a positive number") + } + + groupFile := filepath.Join(pdir, GroupIdFileName) + f, err := os.OpenFile(groupFile, os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + return nil + } + if _, err := f.WriteString(strconv.Itoa(int(group_id))); err != nil { + return err + } + if _, err := f.WriteString("\n"); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + return nil +} + +// ReadGroupIdFile reads the file at the given path and attempts to retrieve the +// group ID stored in it. +func ReadGroupIdFile(pdir string) (uint32, error) { + path := filepath.Join(pdir, GroupIdFileName) + info, err := os.Stat(path) + if os.IsNotExist(err) { + return 0, nil + } + if info.IsDir() { + return 0, errors.Errorf("Group ID file at %s is a directory", path) + } + + contents, err := ioutil.ReadFile(path) + if err != nil { + return 0, err + } + + groupId, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 0, 32) + if err != nil { + return 0, err + } + return uint32(groupId), nil +} diff --git a/x/x.go b/x/x.go index cf7f2025eb8..d4fc49c3702 100644 --- a/x/x.go +++ b/x/x.go @@ -109,6 +109,11 @@ const ( {"predicate":"dgraph.user.group","list":true, "reverse": true, "type": "uid"}, {"predicate":"dgraph.group.acl","type":"string"} ` + // GroupIdFileName is the name of the file storing the ID of the group to which + // the data in a postings directory belongs. This ID is used to join the proper + // group the first time an Alpha comes up with data from a restored backup or a + // bulk load. + GroupIdFileName = "group_id" ) var (