Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental Rollup and Tablet Size Calculation #4972

Merged
merged 25 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5f07dc9
Bring incremental rollup commit
parasssh Jan 9, 2020
75233b5
Incremental Rollups -- let's track versions of keys.
manishrjain Mar 16, 2020
76d2838
Merge master
manishrjain Mar 16, 2020
fe10d20
Build works with master merge
manishrjain Mar 16, 2020
527a082
Fix map ini
manishrjain Mar 17, 2020
50e7544
Some evidence to show it's not just writes, but write in the range of…
manishrjain Mar 17, 2020
700b54b
Some more tries to understand the behavior
manishrjain Mar 17, 2020
53b1ae5
Merge branch 'master' into mrjn/incremental
Mar 18, 2020
55de90c
Using Martin's fix.
manishrjain Mar 18, 2020
eabfe9a
Make rollup goroutine work with the task tracking
manishrjain Mar 18, 2020
3bbde8c
Cleaned up a bit of the log messages by using strings and converting …
manishrjain Mar 18, 2020
401d6a7
Merge branch 'master' into mrjn/incremental
Mar 18, 2020
7f2a9d7
Merge branch 'mrjn/incremental' of https://github.com/dgraph-io/dgrap…
Mar 18, 2020
1656b38
Fix an issue where we were only sending the keys for rollup which did…
manishrjain Mar 18, 2020
ace03ac
Switch tablet size calculation to use DB.Tables method.
manishrjain Mar 19, 2020
000727f
Ensure that when closing, all registered tasks are properly stopped.
manishrjain Mar 19, 2020
2743576
Revert unnecessary change in list.go
manishrjain Mar 19, 2020
ea1081b
Better handle if another task is going on.
manishrjain Mar 19, 2020
280c634
Call rollup even if we find one delta posting
manishrjain Mar 19, 2020
42fb5e8
Add a note about iterating over tables
manishrjain Mar 19, 2020
71d89dd
fix GoLang CI comments
Mar 19, 2020
a4a7c59
Merge branch 'mrjn/incremental' of https://github.com/dgraph-io/dgrap…
Mar 19, 2020
d4cea37
Revert CI changes done by Paras. Add comment about why.
manishrjain Mar 19, 2020
b4a07c0
Remove question, add comment
manishrjain Mar 19, 2020
50e5b72
Merge master
manishrjain Mar 19, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200316175624-91c31ebe8c22
github.com/dgraph-io/dgo/v2 v2.2.0
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgryski/go-farm v0.0.0-20191112170834-c2139c5d712b
github.com/dgryski/go-groupvarint v0.0.0-20190318181831-5ce5df8ca4e1
Expand Down
107 changes: 107 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,121 @@ import (
"encoding/hex"
"math"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/badger/v2"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
)

// incrRollupi is used to batch keys for rollup incrementally.
type incrRollupi struct {
// keysCh is populated with batch of 64 keys that needs to be rolled up during reads
keysCh chan *[][]byte
// keysPool is sync.Pool to share the batched keys to rollup.
keysPool *sync.Pool
count uint64
}

var (
// ErrTsTooOld is returned when a transaction is too old to be applied.
ErrTsTooOld = errors.Errorf("Transaction is too old")
// ErrInvalidKey is returned when trying to read a posting list using
// an invalid key (e.g the key to a single part of a larger multi-part list).
ErrInvalidKey = errors.Errorf("cannot read posting list using multi-part list key")

// IncrRollup is used to batch keys for rollup incrementally.
IncrRollup = &incrRollupi{
keysCh: make(chan *[][]byte),
keysPool: &sync.Pool{
New: func() interface{} {
return new([][]byte)
},
},
}
)

// rollUpKey takes the given key's posting lists, rolls it up and writes back to badger
func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
l, err := GetNoStore(key)
if err != nil {
return err
}

kvs, err := l.Rollup()
if err != nil {
return err
}
const N = uint64(1000)
if glog.V(2) {
if count := atomic.AddUint64(&ir.count, 1); count%N == 0 {
glog.V(2).Infof("Rolled up %d keys", count)
}
}
return writer.Write(&bpb.KVList{Kv: kvs})
}

func (ir *incrRollupi) addKeyToBatch(key []byte) {
batch := ir.keysPool.Get().(*[][]byte)
*batch = append(*batch, key)
if len(*batch) < 64 {
ir.keysPool.Put(batch)
return
}

select {
case ir.keysCh <- batch:
default:
// Drop keys and build the batch again. Lossy behavior.
*batch = (*batch)[:0]
ir.keysPool.Put(batch)
}
}

// Process will rollup batches of 64 keys in a go routine.
func (ir *incrRollupi) Process(closer *y.Closer) {
defer closer.Done()

writer := NewTxnWriter(pstore)
defer writer.Flush()

m := make(map[uint64]int64) // map hash(key) to ts. hash(key) to limit the size of the map.
limiter := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-closer.HasBeenClosed():
return
case batch := <-ir.keysCh:
currTs := time.Now().Unix()
for _, key := range *batch {
hash := z.MemHash(key)
if elem := m[hash]; currTs-elem >= 10 {
// Key not present or Key present but last roll up was more than 10 sec ago.
// Add/Update map and rollup.
m[hash] = currTs
if err := ir.rollUpKey(writer, key); err != nil {
glog.Warningf("Error %v rolling up key %v\n", err, key)
}
}
}
// clear the batch and put it back in Sync keysPool
*batch = (*batch)[:0]
ir.keysPool.Put(batch)

// throttle to 1 batch = 64 rollups per 100 ms.
<-limiter.C
}
}
}

// ShouldAbort returns whether the transaction should be aborted.
func (txn *Txn) ShouldAbort() bool {
if txn == nil {
Expand Down Expand Up @@ -166,6 +264,14 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
l.key = key
l.plist = new(pb.PostingList)

// We use the following block of code to trigger incremental rollup on this key.
deltaCount := 0
defer func() {
if deltaCount > 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we want to do this if deltaCount >= 2?

IncrRollup.addKeyToBatch(key)
}
}()

// Iterates from highest Ts to lowest Ts
for it.Valid() {
item := it.Item()
Expand Down Expand Up @@ -210,6 +316,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
if err != nil {
return nil, err
}
deltaCount++
case BitSchemaPosting:
return nil, errors.Errorf(
"Trying to read schema in ReadPostingList for key: %s", hex.Dump(key))
Expand Down
Loading