Skip to content

Commit

Permalink
Write group_id files to postings directories during restore.
Browse files Browse the repository at this point in the history
This PR changes the restore command to write a group_id file containing
the ID of the group to which the data belongs. This is the same
mechanism used by the bulk loader to ensure alphas with data for
reserved predicates are always assigned to the right group.
  • Loading branch information
martinmr committed Dec 5, 2019
1 parent f378588 commit 2b823e4
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 27 deletions.
8 changes: 1 addition & 7 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
var Bulk x.SubCommand

var defaultOutDir = "./out"
var groupFile = "group_id"

func init() {
Bulk.Cmd = &cobra.Command{
Expand Down Expand Up @@ -194,12 +193,7 @@ func run() {
x.Check(os.MkdirAll(dir, 0700))
opt.shardOutputDirs = append(opt.shardOutputDirs, dir)

groupFile := filepath.Join(dir, groupFile)
f, err := os.OpenFile(groupFile, os.O_CREATE|os.O_WRONLY, 0600)
x.Check(err)
x.Check2(f.WriteString(strconv.Itoa(i + 1)))
x.Check2(f.WriteString("\n"))
x.Check(f.Close())
x.Check(x.WriteGroupIdFile(dir, uint32(i+1)))
}

// Create a directory just for bulk loader's usage.
Expand Down
6 changes: 5 additions & 1 deletion ee/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ func RunRestore(pdir, location, backupId string) (uint64, error) {
if err != nil {
return nil
}
return loadFromBackup(db, gzReader, preds)
if err := loadFromBackup(db, gzReader, preds); err != nil {
return err
}

return x.WriteGroupIdFile(dir, uint32(groupId))
})
}

Expand Down
8 changes: 8 additions & 0 deletions ee/backup/tests/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -249,6 +250,13 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m
_, err := backup.RunRestore("./data/restore", backupLocation, lastDir)
require.NoError(t, err)

for i, pdir := range []string{"p1", "p2", "p3"} {
pdir = filepath.Join("./data/restore", pdir)
groupId, err := x.ReadGroupIdFile(pdir)
require.NoError(t, err)
require.Equal(t, uint32(i+1), groupId)
}

restored, err := testutil.GetPValues("./data/restore/p1", "movie", commitTs)
require.NoError(t, err)
t.Logf("--- Restored values: %+v\n", restored)
Expand Down
8 changes: 8 additions & 0 deletions ee/backup/tests/minio/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -258,6 +259,13 @@ func runRestore(t *testing.T, lastDir string, commitTs uint64) map[string]string
err = testutil.ExecWithOpts(argv, testutil.CmdOpts{Dir: cwd})
require.NoError(t, err)

for i, pdir := range []string{"p1", "p2", "p3"} {
pdir = filepath.Join("./data/restore", pdir)
groupId, err := x.ReadGroupIdFile(pdir)
require.NoError(t, err)
require.Equal(t, uint32(i+1), groupId)
}

restored, err := testutil.GetPValues("./data/restore/p1", "movie", commitTs)
require.NoError(t, err)
t.Logf("--- Restored values: %+v\n", restored)
Expand Down
23 changes: 4 additions & 19 deletions worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@
package worker

import (
"io/ioutil"
"math"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/dgraph-io/badger/v2"
Expand All @@ -33,10 +29,6 @@ import (
"golang.org/x/net/context"
)

const (
groupFile = "group_id"
)

// ServerState holds the state of the Dgraph server.
type ServerState struct {
FinishCh chan struct{} // channel to wait for all pending reqs to finish.
Expand Down Expand Up @@ -65,19 +57,12 @@ func InitServerState() {
State.initStorage()
go State.fillTimestampRequests()

contents, err := ioutil.ReadFile(filepath.Join(Config.PostingDir, groupFile))
if err != nil {
return
}

glog.Infof("Found group_id file inside posting directory %s. Will attempt to read.",
Config.PostingDir)
groupId, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 0, 32)
groupId, err := x.ReadGroupIdFile(Config.PostingDir)
if err != nil {
glog.Warningf("Could not read %s file inside posting directory %s.",
groupFile, Config.PostingDir)
glog.Warningf("Could not read %s file inside posting directory %s.", x.GroupIdFileName,
Config.PostingDir)
}
x.WorkerConfig.ProposedGroupId = uint32(groupId)
x.WorkerConfig.ProposedGroupId = groupId
}

func (s *ServerState) runVlogGC(store *badger.DB) {
Expand Down
51 changes: 51 additions & 0 deletions x/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package x

import (
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/golang/glog"
Expand Down Expand Up @@ -149,3 +151,52 @@ func IsMissingOrEmptyDir(path string) (err error) {
err = ErrMissingDir
return
}

// WriteGroupIdFile writes the given group ID to the group_id file inside the given
// postings directory.
func WriteGroupIdFile(pdir string, group_id uint32) error {
if group_id == 0 {
return errors.Errorf("ID written to group_id file must be a positive number")
}

groupFile := filepath.Join(pdir, GroupIdFileName)
f, err := os.OpenFile(groupFile, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return nil
}
if _, err := f.WriteString(strconv.Itoa(int(group_id))); err != nil {
return err
}
if _, err := f.WriteString("\n"); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}

return nil
}

// ReadGroupIdFile reads the file at the given path and attempts to retrieve the
// group ID stored in it.
func ReadGroupIdFile(pdir string) (uint32, error) {
path := filepath.Join(pdir, GroupIdFileName)
info, err := os.Stat(path)
if os.IsNotExist(err) {
return 0, nil
}
if info.IsDir() {
return 0, errors.Errorf("Group ID file at %s is a directory", path)
}

contents, err := ioutil.ReadFile(path)
if err != nil {
return 0, err
}

groupId, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 0, 32)
if err != nil {
return 0, err
}
return uint32(groupId), nil
}
5 changes: 5 additions & 0 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ const (
{"predicate":"dgraph.user.group","list":true, "reverse": true, "type": "uid"},
{"predicate":"dgraph.group.acl","type":"string"}
`
// GroupIdFileName is the name of the file storing the ID of the group to which
// the data in a postings directory belongs. This ID is used to join the proper
// group the first time an Alpha comes up with data from a restored backup or a
// bulk load.
GroupIdFileName = "group_id"
)

var (
Expand Down

0 comments on commit 2b823e4

Please sign in to comment.