diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index a43428e23b6..4845ac81768 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -38,7 +38,6 @@ import ( var Bulk x.SubCommand var defaultOutDir = "./out" -var groupFile = "group_id" func init() { Bulk.Cmd = &cobra.Command{ @@ -194,12 +193,7 @@ func run() { x.Check(os.MkdirAll(dir, 0700)) opt.shardOutputDirs = append(opt.shardOutputDirs, dir) - groupFile := filepath.Join(dir, groupFile) - f, err := os.OpenFile(groupFile, os.O_CREATE|os.O_WRONLY, 0600) - x.Check(err) - x.Check2(f.WriteString(strconv.Itoa(i + 1))) - x.Check2(f.WriteString("\n")) - x.Check(f.Close()) + x.Check(x.WriteGroupIdFile(dir, uint32(i+1))) } // Create a directory just for bulk loader's usage. diff --git a/ee/backup/restore.go b/ee/backup/restore.go index d2fea6de885..a97c7bc2138 100644 --- a/ee/backup/restore.go +++ b/ee/backup/restore.go @@ -55,7 +55,11 @@ func RunRestore(pdir, location, backupId string) (uint64, error) { if err != nil { return nil } - return loadFromBackup(db, gzReader, preds) + if err := loadFromBackup(db, gzReader, preds); err != nil { + return err + } + + return x.WriteGroupIdFile(dir, uint32(groupId)) }) } diff --git a/ee/backup/tests/filesystem/backup_test.go b/ee/backup/tests/filesystem/backup_test.go index 1cc1375410a..ce312d16036 100644 --- a/ee/backup/tests/filesystem/backup_test.go +++ b/ee/backup/tests/filesystem/backup_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/url" "os" + "path/filepath" "strings" "testing" "time" @@ -249,6 +250,13 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m _, err := backup.RunRestore("./data/restore", backupLocation, lastDir) require.NoError(t, err) + for i, pdir := range []string{"p1", "p2", "p3"} { + pdir = filepath.Join("./data/restore", pdir) + groupId, err := x.ReadGroupIdFile(pdir) + require.NoError(t, err) + require.Equal(t, uint32(i+1), groupId) + } + restored, err := testutil.GetPValues("./data/restore/p1", "movie", commitTs) require.NoError(t, err) t.Logf("--- Restored values: %+v\n", restored) diff --git a/ee/backup/tests/minio/backup_test.go b/ee/backup/tests/minio/backup_test.go index e486113aab5..59b18d50608 100644 --- a/ee/backup/tests/minio/backup_test.go +++ b/ee/backup/tests/minio/backup_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/url" "os" + "path/filepath" "strings" "testing" "time" @@ -258,6 +259,13 @@ func runRestore(t *testing.T, lastDir string, commitTs uint64) map[string]string err = testutil.ExecWithOpts(argv, testutil.CmdOpts{Dir: cwd}) require.NoError(t, err) + for i, pdir := range []string{"p1", "p2", "p3"} { + pdir = filepath.Join("./data/restore", pdir) + groupId, err := x.ReadGroupIdFile(pdir) + require.NoError(t, err) + require.Equal(t, uint32(i+1), groupId) + } + restored, err := testutil.GetPValues("./data/restore/p1", "movie", commitTs) require.NoError(t, err) t.Logf("--- Restored values: %+v\n", restored) diff --git a/worker/server_state.go b/worker/server_state.go index d1a6be9a52f..0d1a67fbab6 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -17,12 +17,8 @@ package worker import ( - "io/ioutil" "math" "os" - "path/filepath" - "strconv" - "strings" "time" "github.com/dgraph-io/badger/v2" @@ -33,10 +29,6 @@ import ( "golang.org/x/net/context" ) -const ( - groupFile = "group_id" -) - // ServerState holds the state of the Dgraph server. type ServerState struct { FinishCh chan struct{} // channel to wait for all pending reqs to finish. @@ -65,19 +57,12 @@ func InitServerState() { State.initStorage() go State.fillTimestampRequests() - contents, err := ioutil.ReadFile(filepath.Join(Config.PostingDir, groupFile)) - if err != nil { - return - } - - glog.Infof("Found group_id file inside posting directory %s. Will attempt to read.", - Config.PostingDir) - groupId, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 0, 32) + groupId, err := x.ReadGroupIdFile(Config.PostingDir) if err != nil { - glog.Warningf("Could not read %s file inside posting directory %s.", - groupFile, Config.PostingDir) + glog.Warningf("Could not read %s file inside posting directory %s.", x.GroupIdFileName, + Config.PostingDir) } - x.WorkerConfig.ProposedGroupId = uint32(groupId) + x.WorkerConfig.ProposedGroupId = groupId } func (s *ServerState) runVlogGC(store *badger.DB) { diff --git a/x/file.go b/x/file.go index 25d56b22179..a786247f2f6 100644 --- a/x/file.go +++ b/x/file.go @@ -18,8 +18,10 @@ package x import ( "io" + "io/ioutil" "os" "path/filepath" + "strconv" "strings" "github.com/golang/glog" @@ -149,3 +151,52 @@ func IsMissingOrEmptyDir(path string) (err error) { err = ErrMissingDir return } + +// WriteGroupIdFile writes the given group ID to the group_id file inside the given +// postings directory. +func WriteGroupIdFile(pdir string, group_id uint32) error { + if group_id == 0 { + return errors.Errorf("ID written to group_id file must be a positive number") + } + + groupFile := filepath.Join(pdir, GroupIdFileName) + f, err := os.OpenFile(groupFile, os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + return nil + } + if _, err := f.WriteString(strconv.Itoa(int(group_id))); err != nil { + return err + } + if _, err := f.WriteString("\n"); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + return nil +} + +// ReadGroupIdFile reads the file at the given path and attempts to retrieve the +// group ID stored in it. +func ReadGroupIdFile(pdir string) (uint32, error) { + path := filepath.Join(pdir, GroupIdFileName) + info, err := os.Stat(path) + if os.IsNotExist(err) { + return 0, nil + } + if info.IsDir() { + return 0, errors.Errorf("Group ID file at %s is a directory", path) + } + + contents, err := ioutil.ReadFile(path) + if err != nil { + return 0, err + } + + groupId, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 0, 32) + if err != nil { + return 0, err + } + return uint32(groupId), nil +} diff --git a/x/x.go b/x/x.go index 7af1d8f385b..90345562575 100644 --- a/x/x.go +++ b/x/x.go @@ -109,6 +109,11 @@ const ( {"predicate":"dgraph.user.group","list":true, "reverse": true, "type": "uid"}, {"predicate":"dgraph.group.acl","type":"string"} ` + // GroupIdFileName is the name of the file storing the ID of the group to which + // the data in a postings directory belongs. This ID is used to join the proper + // group the first time an Alpha comes up with data from a restored backup or a + // bulk load. + GroupIdFileName = "group_id" ) var (