Skip to content

Commit

Permalink
Use rampMeter for Executor (#5503)
Browse files Browse the repository at this point in the history
Currently we have rampMeter while applying proposals. This rampMeter puts a throttling on
memory used by underlying mutations present in proposals to be applied. However this works
correctly only in normal mode.
In ludicrous mode, we apply mutations for these proposals asynchronously. Hence above
rampMeter doesn't work as expected and this results in lot of memory usage in cases when there
are many proposals to apply.
This PR introduces a rampMeter for executor as well. Executor is responsible for applying mutations
in ludicrous mode.

Testing:
I tested this PR by running live loader on 21M dataset in ludicrous mode. On this PR live loader completes in ~5-6 minutes. However when I changed maxPendingEdgesSize to 64KB(instead of 64MB currently) in this PR, live loader completes in ~10-11 minutes.

Benchmarking:
I benchmarked using data generated by below script:

package main

import (
  "bytes"
  "fmt"
  "os"
)

var (
  total int = 100000000
  pred      = 1024
)

func main() {
  f, err := os.OpenFile("test.rdf", os.O_CREATE|os.O_RDWR, 0755)
  if err != nil {
    panic(err)
  }
  defer f.Close()

  totalPerPred := total / pred

  buf := bytes.NewBuffer(nil)
  count := 1
  for i := 0; i < totalPerPred; i++ {
    for j := 0; j < pred; j++ {
      rec := fmt.Sprintf(`_:record_%d <pred_%d> "value_%d" .`, count, j, count)
      buf.WriteString(rec)
      buf.WriteString("\n")
      count++
      if count%100000 == 0 { 
        buf.WriteTo(f)
        buf.Reset()
      }   
    }   
  }

  buf.WriteTo(f)
  if err := f.Sync(); err != nil {
    panic(err)
  }
  fmt.Println("Done writing to file: ", count)
}
Above scripts generates ~100M records with 97K records/predicate and total of 1024 predicates.

Master:
Time to finish live loader: 8m44s
Alpha RAM(RES) usage: 10.9 GB

This PR with maxPendingEdgesSize = 64KB:
Time to finish live loader: 8m48s
Alpha RAM(RES) usage: 9.6 GB

This PR with maxPendingEdgesSize = 64MB:
Time to finish live loader: 8m32s
Alpha RAM(RES) usage: 10.5 GB

(cherry picked from commit 4735952)
  • Loading branch information
ashish-goswami committed Jun 10, 2020
1 parent dfca80d commit 76d2d52
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
13 changes: 8 additions & 5 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,17 +809,20 @@ func (n *node) proposeSnapshot(discardN int) error {
return n.Raft().Propose(n.ctx, data)
}

const maxPendingSize int64 = 64 << 20 // in bytes.
const (
maxPendingSize int64 = 64 << 20 // in bytes.
nodeApplyChan = "raft node applyCh"
)

func (n *node) rampMeter() {
func rampMeter(address *int64, maxSize int64, component string) {
start := time.Now()
defer func() {
if dur := time.Since(start); dur > time.Second {
glog.Infof("Blocked pushing to applyCh for %v", dur.Round(time.Millisecond))
glog.Infof("Blocked pushing to %s for %v", component, dur.Round(time.Millisecond))
}
}()
for {
if atomic.LoadInt64(&n.pendingSize) <= maxPendingSize {
if atomic.LoadInt64(address) <= maxSize {
return
}
time.Sleep(3 * time.Millisecond)
Expand Down Expand Up @@ -1128,7 +1131,7 @@ func (n *node) Run() {
// Apply the meter this before adding size to pending size so some crazy big
// proposal can be pushed to applyCh. If this do this after adding its size to
// pending size, we could block forever in rampMeter.
n.rampMeter()
rampMeter(&n.pendingSize, maxPendingSize, nodeApplyChan)
var pendingSize int64
for _, p := range proposals {
pendingSize += int64(p.Size())
Expand Down
18 changes: 17 additions & 1 deletion worker/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package worker
import (
"context"
"sync"
"sync/atomic"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/posting"
Expand All @@ -35,6 +36,8 @@ type subMutation struct {
}

type executor struct {
pendingSize int64

sync.RWMutex
predChan map[string]chan *subMutation
closer *y.Closer
Expand All @@ -54,8 +57,10 @@ func (e *executor) processMutationCh(ch chan *subMutation) {

writer := posting.NewTxnWriter(pstore)
for payload := range ch {
var esize int64
ptxn := posting.NewTxn(payload.startTs)
for _, edge := range payload.edges {
esize += int64(edge.Size())
for {
err := runMutation(payload.ctx, edge, ptxn)
if err == nil {
Expand All @@ -74,6 +79,8 @@ func (e *executor) processMutationCh(ch chan *subMutation) {
if err := writer.Wait(); err != nil {
glog.Errorf("Error while waiting for writes: %v", err)
}

atomic.AddInt64(&e.pendingSize, -esize)
}
}

Expand All @@ -99,9 +106,16 @@ func (e *executor) getChannelUnderLock(pred string) (ch chan *subMutation) {
return ch
}

const (
maxPendingEdgesSize int64 = 64 << 20
executorAddEdges = "executor.addEdges"
)

func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.DirectedEdge) {
payloadMap := make(map[string]*subMutation)
rampMeter(&e.pendingSize, maxPendingEdgesSize, executorAddEdges)

payloadMap := make(map[string]*subMutation)
var esize int64
for _, edge := range edges {
payload, ok := payloadMap[edge.Attr]
if !ok {
Expand All @@ -112,6 +126,7 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir
payload = payloadMap[edge.Attr]
}
payload.edges = append(payload.edges, edge)
esize += int64(edge.Size())
}

// Lock() in case the channel gets closed from underneath us.
Expand All @@ -127,4 +142,5 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir
}
}

atomic.AddInt64(&e.pendingSize, esize)
}

0 comments on commit 76d2d52

Please sign in to comment.