Skip to content

Commit

Permalink
fix(bulk): save schemaMap after map phase (#7188) (#7351)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajeetdsouza authored Jan 22, 2021
1 parent 92e837f commit c6ae3fa
Show file tree
Hide file tree
Showing 4 changed files with 695 additions and 307 deletions.
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/merge_shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
const (
mapShardDir = "map_output"
reduceShardDir = "shards"
bufferDir = "buffer"
bufferDir = "buffer"
)

func mergeMapShardsIntoReduceShards(opt *options) {
Expand Down
37 changes: 36 additions & 1 deletion dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package bulk
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math"
"net/http"
Expand All @@ -29,6 +30,7 @@ import (
"strconv"
"strings"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"

"github.com/dgraph-io/dgraph/ee/enc"
Expand Down Expand Up @@ -298,9 +300,42 @@ func run() {
defer os.RemoveAll(bufDir)

loader := newLoader(&opt)
if !opt.SkipMapPhase {

const bulkMetaFilename = "bulk.meta"
bulkMetaPath := filepath.Join(opt.TmpDir, bulkMetaFilename)

if opt.SkipMapPhase {
bulkMetaData, err := ioutil.ReadFile(bulkMetaPath)
if err != nil {
fmt.Fprintln(os.Stderr, "Error reading from bulk meta file")
os.Exit(1)
}

var bulkMeta pb.BulkMeta
if err = bulkMeta.Unmarshal(bulkMetaData); err != nil {
fmt.Fprintln(os.Stderr, "Error deserializing bulk meta file")
os.Exit(1)
}

loader.prog.mapEdgeCount = bulkMeta.EdgeCount
loader.schema.schemaMap = bulkMeta.SchemaMap
} else {
loader.mapStage()
mergeMapShardsIntoReduceShards(&opt)

bulkMeta := pb.BulkMeta{
EdgeCount: loader.prog.mapEdgeCount,
SchemaMap: loader.schema.schemaMap,
}
bulkMetaData, err := bulkMeta.Marshal()
if err != nil {
fmt.Fprintln(os.Stderr, "Error serializing bulk meta file")
os.Exit(1)
}
if err = ioutil.WriteFile(bulkMetaPath, bulkMetaData, 0644); err != nil {
fmt.Fprintln(os.Stderr, "Error writing to bulk meta file")
os.Exit(1)
}
}
loader.reduceStage()
loader.writeSchema()
Expand Down
6 changes: 6 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -687,4 +687,10 @@ message UpdateGraphQLSchemaResponse {
uint64 uid = 1;
}

// BulkMeta stores metadata from the map phase of the bulk loader.
message BulkMeta {
int64 edge_count = 1;
map<string, SchemaUpdate> schema_map = 2;
}

// vim: noexpandtab sw=2 ts=2
Loading

0 comments on commit c6ae3fa

Please sign in to comment.