Skip to content

Reduce memory consumption in bulk loader #3724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jul 31, 2019
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type options struct {
ReplaceOutDir bool
TmpDir string
NumGoroutines int
MapBufSize int64
MapBufSize uint64
SkipMapPhase bool
CleanupTmp bool
NumReducers int
Expand Down
53 changes: 23 additions & 30 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
farm "github.com/dgryski/go-farm"
"github.com/gogo/protobuf/proto"
)

type mapper struct {
Expand All @@ -52,8 +51,10 @@ type mapper struct {
type shardState struct {
// Buffer up map entries until we have a sufficient amount, then sort and
// write them to file.
entriesBuf []byte
mu sync.Mutex // Allow only 1 write per shard at a time.
//entriesBuf []byte
entries []*pb.MapEntry
encodedSize uint64
mu sync.Mutex // Allow only 1 write per shard at a time.
}

func newMapper(st *state) *mapper {
Expand All @@ -78,34 +79,24 @@ func less(lhs, rhs *pb.MapEntry) bool {
return lhsUID < rhsUID
}

func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) {
func (m *mapper) writeMapEntriesToFile(entries []*pb.MapEntry, encodedSize uint64, shardIdx int) {
defer m.shards[shardIdx].mu.Unlock() // Locked by caller.

buf := entriesBuf
var entries []*pb.MapEntry
for len(buf) > 0 {
sz, n := binary.Uvarint(buf)
x.AssertTrue(n > 0)
buf = buf[n:]
me := new(pb.MapEntry)
x.Check(proto.Unmarshal(buf[:sz], me))
buf = buf[sz:]
entries = append(entries, me)
}

sort.Slice(entries, func(i, j int) bool {
return less(entries[i], entries[j])
})

buf = entriesBuf
buf := make([]byte, encodedSize, encodedSize)
offset := 0
for _, me := range entries {
n := binary.PutUvarint(buf, uint64(me.Size()))
buf = buf[n:]
n, err := me.MarshalTo(buf)
n := binary.PutUvarint(buf[offset:], uint64(me.Size()))
offset += n
n, err := me.MarshalTo(buf[offset:])
x.Check(err)
buf = buf[n:]
offset += n
}
x.AssertTrue(len(buf) == 0)
// enlarge buf to include all the data
buf = buf[0:offset]

fileNum := atomic.AddUint32(&m.mapFileId, 1)
filename := filepath.Join(
Expand All @@ -115,7 +106,7 @@ func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) {
fmt.Sprintf("%06d.map", fileNum),
)
x.Check(os.MkdirAll(filepath.Dir(filename), 0755))
x.Check(x.WriteFileSync(filename, entriesBuf, 0644))
x.Check(x.WriteFileSync(filename, buf, 0644))
}

func (m *mapper) run(inputFormat chunker.InputFormat) {
Expand Down Expand Up @@ -147,20 +138,22 @@ func (m *mapper) run(inputFormat chunker.InputFormat) {

for i := range m.shards {
sh := &m.shards[i]
if len(sh.entriesBuf) >= int(m.opt.MapBufSize) {
if sh.encodedSize >= m.opt.MapBufSize {
sh.mu.Lock() // One write at a time.
go m.writeMapEntriesToFile(sh.entriesBuf, i)
sh.entriesBuf = make([]byte, 0, m.opt.MapBufSize*11/10)
go m.writeMapEntriesToFile(sh.entries, sh.encodedSize, i)
// clear the entries and encodedSize for the next batch
sh.entries = make([]*pb.MapEntry, 0, 10)
sh.encodedSize = 0
}
}
}
}

for i := range m.shards {
sh := &m.shards[i]
if len(sh.entriesBuf) > 0 {
if len(sh.entries) > 0 {
sh.mu.Lock() // One write at a time.
m.writeMapEntriesToFile(sh.entriesBuf, i)
m.writeMapEntriesToFile(sh.entries, sh.encodedSize, i)
}
m.shards[i].mu.Lock() // Ensure that the last file write finishes.
}
Expand All @@ -180,8 +173,8 @@ func (m *mapper) addMapEntry(key []byte, p *pb.Posting, shard int) {
sh := &m.shards[shard]

var err error
sh.entriesBuf = x.AppendUvarint(sh.entriesBuf, uint64(me.Size()))
sh.entriesBuf, err = x.AppendProtoMsg(sh.entriesBuf, me)
sh.entries = append(sh.entries, me)
sh.encodedSize += binary.MaxVarintLen64 + uint64(me.Size())
x.Check(err)
}

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func run() {
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
TmpDir: Bulk.Conf.GetString("tmp"),
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
MapBufSize: int64(Bulk.Conf.GetInt("mapoutput_mb")),
MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")),
SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"),
CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"),
NumReducers: Bulk.Conf.GetInt("reducers"),
Expand Down
61 changes: 0 additions & 61 deletions x/proto.go

This file was deleted.