Skip to content

Commit c04d6be

Browse files
ashish-goswamidna2github
authored andcommitted
Use rampMeter for Executor (hypermodeinc#5503)
Currently we have rampMeter while applying proposals. This rampMeter puts a throttling on memory used by underlying mutations present in proposals to be applied. However this works correctly only in normal mode. In ludicrous mode, we apply mutations for these proposals asynchronously. Hence above rampMeter doesn't work as expected and this results in lot of memory usage in cases when there are many proposals to apply. This PR introduces a rampMeter for executor as well. Executor is responsible for applying mutations in ludicrous mode. Testing: I tested this PR by running live loader on 21M dataset in ludicrous mode. On this PR live loader completes in ~5-6 minutes. However when I changed maxPendingEdgesSize to 64KB(instead of 64MB currently) in this PR, live loader completes in ~10-11 minutes. Benchmarking: I benchmarked using data generated by below script: package main import ( "bytes" "fmt" "os" ) var ( total int = 100000000 pred = 1024 ) func main() { f, err := os.OpenFile("test.rdf", os.O_CREATE|os.O_RDWR, 0755) if err != nil { panic(err) } defer f.Close() totalPerPred := total / pred buf := bytes.NewBuffer(nil) count := 1 for i := 0; i < totalPerPred; i++ { for j := 0; j < pred; j++ { rec := fmt.Sprintf(`_:record_%d <pred_%d> "value_%d" .`, count, j, count) buf.WriteString(rec) buf.WriteString("\n") count++ if count%100000 == 0 { buf.WriteTo(f) buf.Reset() } } } buf.WriteTo(f) if err := f.Sync(); err != nil { panic(err) } fmt.Println("Done writing to file: ", count) } Above scripts generates ~100M records with 97K records/predicate and total of 1024 predicates. Master: Time to finish live loader: 8m44s Alpha RAM(RES) usage: 10.9 GB This PR with maxPendingEdgesSize = 64KB: Time to finish live loader: 8m48s Alpha RAM(RES) usage: 9.6 GB This PR with maxPendingEdgesSize = 64MB: Time to finish live loader: 8m32s Alpha RAM(RES) usage: 10.5 GB
1 parent a4d8913 commit c04d6be

File tree

2 files changed

+25
-6
lines changed

2 files changed

+25
-6
lines changed

worker/draft.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -849,17 +849,20 @@ func (n *node) proposeSnapshot(discardN int) error {
849849
return n.Raft().Propose(n.ctx, data)
850850
}
851851

852-
const maxPendingSize int64 = 64 << 20 // in bytes.
852+
const (
853+
maxPendingSize int64 = 64 << 20 // in bytes.
854+
nodeApplyChan = "raft node applyCh"
855+
)
853856

854-
func (n *node) rampMeter() {
857+
func rampMeter(address *int64, maxSize int64, component string) {
855858
start := time.Now()
856859
defer func() {
857860
if dur := time.Since(start); dur > time.Second {
858-
glog.Infof("Blocked pushing to applyCh for %v", dur.Round(time.Millisecond))
861+
glog.Infof("Blocked pushing to %s for %v", component, dur.Round(time.Millisecond))
859862
}
860863
}()
861864
for {
862-
if atomic.LoadInt64(&n.pendingSize) <= maxPendingSize {
865+
if atomic.LoadInt64(address) <= maxSize {
863866
return
864867
}
865868
time.Sleep(3 * time.Millisecond)
@@ -1168,7 +1171,7 @@ func (n *node) Run() {
11681171
// Apply the meter this before adding size to pending size so some crazy big
11691172
// proposal can be pushed to applyCh. If this do this after adding its size to
11701173
// pending size, we could block forever in rampMeter.
1171-
n.rampMeter()
1174+
rampMeter(&n.pendingSize, maxPendingSize, nodeApplyChan)
11721175
var pendingSize int64
11731176
for _, p := range proposals {
11741177
pendingSize += int64(p.Size())

worker/executor.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package worker
2121
import (
2222
"context"
2323
"sync"
24+
"sync/atomic"
2425

2526
"github.com/dgraph-io/badger/v2/y"
2627
"github.com/dgraph-io/dgraph/posting"
@@ -35,6 +36,8 @@ type subMutation struct {
3536
}
3637

3738
type executor struct {
39+
pendingSize int64
40+
3841
sync.RWMutex
3942
predChan map[string]chan *subMutation
4043
closer *y.Closer
@@ -54,8 +57,10 @@ func (e *executor) processMutationCh(ch chan *subMutation) {
5457

5558
writer := posting.NewTxnWriter(pstore)
5659
for payload := range ch {
60+
var esize int64
5761
ptxn := posting.NewTxn(payload.startTs)
5862
for _, edge := range payload.edges {
63+
esize += int64(edge.Size())
5964
for {
6065
err := runMutation(payload.ctx, edge, ptxn)
6166
if err == nil {
@@ -74,6 +79,8 @@ func (e *executor) processMutationCh(ch chan *subMutation) {
7479
if err := writer.Wait(); err != nil {
7580
glog.Errorf("Error while waiting for writes: %v", err)
7681
}
82+
83+
atomic.AddInt64(&e.pendingSize, -esize)
7784
}
7885
}
7986

@@ -99,9 +106,16 @@ func (e *executor) getChannelUnderLock(pred string) (ch chan *subMutation) {
99106
return ch
100107
}
101108

109+
const (
110+
maxPendingEdgesSize int64 = 64 << 20
111+
executorAddEdges = "executor.addEdges"
112+
)
113+
102114
func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.DirectedEdge) {
103-
payloadMap := make(map[string]*subMutation)
115+
rampMeter(&e.pendingSize, maxPendingEdgesSize, executorAddEdges)
104116

117+
payloadMap := make(map[string]*subMutation)
118+
var esize int64
105119
for _, edge := range edges {
106120
payload, ok := payloadMap[edge.Attr]
107121
if !ok {
@@ -112,6 +126,7 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir
112126
payload = payloadMap[edge.Attr]
113127
}
114128
payload.edges = append(payload.edges, edge)
129+
esize += int64(edge.Size())
115130
}
116131

117132
// Lock() in case the channel gets closed from underneath us.
@@ -127,4 +142,5 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir
127142
}
128143
}
129144

145+
atomic.AddInt64(&e.pendingSize, esize)
130146
}

0 commit comments

Comments
 (0)