Skip to content

Commit

Permalink
Bulk loader allocates reserved predicates in first reduce shard. (#4202)
Browse files Browse the repository at this point in the history
This PR contains a couple of related changes.

Bulk loader forces reserved predicates to end up in the first reduce shard. It
also writes a file in the posting directories with the proposed group ID for
each shard.
Dgraph looks at that file during startup and uses it to request the right group
ID from zero.
The change is being tested by modifying the 21million test to use multiple groups
and add a new query to verify the number of nodes with a dgraph.type predicate.
If the test runs without the fix, dgraph.type sometime ends up in a different group.

Fixes #3968.
  • Loading branch information
martinmr authored Oct 24, 2019
1 parent 97c5bca commit 2813b34
Show file tree
Hide file tree
Showing 13 changed files with 448 additions and 316 deletions.
22 changes: 16 additions & 6 deletions dgraph/cmd/bulk/merge_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ const (
)

func mergeMapShardsIntoReduceShards(opt options) {
mapShards := shardDirs(filepath.Join(opt.TmpDir, mapShardDir))
shardDirs := readShardDirs(filepath.Join(opt.TmpDir, mapShardDir))
// First shard is handled differently because it contains reserved predicates.
x.AssertTrue(len(shardDirs) > 0)
firstShard := shardDirs[0]
// Sort the rest of the shards by size to allow the largest shards to be shuffled first.
shardDirs = shardDirs[1:]
sortBySize(shardDirs)

var reduceShards []string
for i := 0; i < opt.ReduceShards; i++ {
Expand All @@ -41,9 +47,15 @@ func mergeMapShardsIntoReduceShards(opt options) {
reduceShards = append(reduceShards, shardDir)
}

// Put the first map shard in the first reduce shard since it contains all the reserved
// predicates.
reduceShard := filepath.Join(reduceShards[0], filepath.Base(firstShard))
fmt.Printf("Shard %s -> Reduce %s\n", firstShard, reduceShard)
x.Check(os.Rename(firstShard, reduceShard))

// Heuristic: put the largest map shard into the smallest reduce shard
// until there are no more map shards left. Should be a good approximation.
for _, shard := range mapShards {
for _, shard := range shardDirs {
sortBySize(reduceShards)
reduceShard := filepath.Join(
reduceShards[len(reduceShards)-1], filepath.Base(shard))
Expand All @@ -52,7 +64,7 @@ func mergeMapShardsIntoReduceShards(opt options) {
}
}

func shardDirs(d string) []string {
func readShardDirs(d string) []string {
dir, err := os.Open(d)
x.Check(err)
shards, err := dir.Readdirnames(0)
Expand All @@ -61,9 +73,7 @@ func shardDirs(d string) []string {
for i, shard := range shards {
shards[i] = filepath.Join(d, shard)
}

// Allow largest shards to be shuffled first.
sortBySize(shards)
sort.Strings(shards)
return shards
}

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type reducer struct {
}

func (r *reducer) run() error {
dirs := shardDirs(filepath.Join(r.opt.TmpDir, reduceShardDir))
dirs := readShardDirs(filepath.Join(r.opt.TmpDir, reduceShardDir))
x.AssertTrue(len(dirs) == r.opt.ReduceShards)
x.AssertTrue(len(r.opt.shardOutputDirs) == r.opt.ReduceShards)

Expand Down
8 changes: 8 additions & 0 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
var Bulk x.SubCommand

var defaultOutDir = "./out"
var groupFile = "group_id"

func init() {
Bulk.Cmd = &cobra.Command{
Expand Down Expand Up @@ -192,6 +193,13 @@ func run() {
dir := filepath.Join(opt.OutDir, strconv.Itoa(i), "p")
x.Check(os.MkdirAll(dir, 0700))
opt.shardOutputDirs = append(opt.shardOutputDirs, dir)

groupFile := filepath.Join(dir, groupFile)
f, err := os.OpenFile(groupFile, os.O_CREATE|os.O_WRONLY, 0600)
x.Check(err)
x.Check2(f.WriteString(strconv.Itoa(i + 1)))
x.Check2(f.WriteString("\n"))
x.Check(f.Close())
}

// Create a directory just for bulk loader's usage.
Expand Down
11 changes: 10 additions & 1 deletion dgraph/cmd/bulk/shard_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package bulk

import "sync"
import (
"sync"

"github.com/dgraph-io/dgraph/x"
)

type shardMap struct {
sync.RWMutex
Expand All @@ -33,6 +37,11 @@ func newShardMap(numShards int) *shardMap {
}

func (m *shardMap) shardFor(pred string) int {
// Always assign NQuads with reserved predicates to the first map shard.
if x.IsReservedPredicate(pred) {
return 0
}

m.RLock()
shard, ok := m.predToShard[pred]
m.RUnlock()
Expand Down
6 changes: 6 additions & 0 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@ func (s *Server) Connect(ctx context.Context,
// We need more servers here, so let's add it.
proposal.Member = m
return proposal
} else if m.ForceGroupId {
// If the group ID was taken from the group_id file, force the member
// to be in this group even if the group is at capacity. This should
// not happen if users properly initialize a cluster after a bulk load.
proposal.Member = m
return proposal
}
// Already have plenty of servers serving this group.
}
Expand Down
18 changes: 17 additions & 1 deletion edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package edgraph
import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -60,6 +62,7 @@ import (
const (
methodMutate = "Server.Mutate"
methodQuery = "Server.Query"
groupFile = "group_id"
)

// ServerState holds the state of the Dgraph server.
Expand Down Expand Up @@ -96,8 +99,21 @@ func InitServerState() {
State.needTs = make(chan tsReq, 100)

State.initStorage()

go State.fillTimestampRequests()

contents, err := ioutil.ReadFile(filepath.Join(Config.PostingDir, groupFile))
if err != nil {
return
}

glog.Infof("Found group_id file inside posting directory %s. Will attempt to read.",
Config.PostingDir)
groupId, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 0, 32)
if err != nil {
glog.Warningf("Could not read %s file inside posting directory %s.",
groupFile, Config.PostingDir)
}
x.WorkerConfig.ProposedGroupId = uint32(groupId)
}

func (s *ServerState) runVlogGC(store *badger.DB) {
Expand Down
1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ message Member {
uint64 last_update = 6;

bool cluster_info_only = 13;
bool force_group_id = 14;
}

message Group {
Expand Down
Loading

0 comments on commit 2813b34

Please sign in to comment.