Skip to content

Commit

Permalink
Fix wal replay issue during rollup (#8774)
Browse files Browse the repository at this point in the history
We have confirmation that rollups can cause issues during wal replay.
This diff fixes the issue by issuing incremental rollup a new timestamp
where the rollup would be written. We wait until the timestamp - 1 time
to read the data, and then do the rollup at the max timestamp + 1.

Performance implications:
Live loader before:
Number of TXs run : 21240
Number of N-Quads processed  : 21239870
Time spent                   : 10m26.664095134s
N-Quads processed per second : 33929

Live Loader after the changes:
Number of TXs run : 21240
Number of N-Quads processed  : 21239870
Time spent                   : 10m23.564645632s
N-Quads processed per second : 34312

Negligible different in time taken to upload 21 million dataset.
  • Loading branch information
harshil-goel authored Apr 10, 2023
1 parent 1643363 commit f64e8a6
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 31 deletions.
5 changes: 4 additions & 1 deletion dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,11 @@ func (r *reducer) toList(req *encodeRequest) {
shouldSplit := pl.Size() > (1<<20)/2 && len(pl.Pack.Blocks) > 1
if shouldSplit {
// Give ownership of pl.Pack away to list. Rollup would deallocate the Pack.
// We do rollup at math.MaxUint64 so that we don't change the allocated
// timestamp of the posting list. The posting list originally is written
// at writeVersionTs, we don't want to change that in rollup.
l := posting.NewList(y.Copy(currentKey), pl, writeVersionTs)
kvs, err := l.Rollup(nil)
kvs, err := l.Rollup(nil, math.MaxUint64)
x.Check(err)

// Assign a new allocator, so we don't reset the one we were using during Rollup.
Expand Down
3 changes: 2 additions & 1 deletion dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,8 @@ func rollupKey(db *badger.DB) {
alloc := z.NewAllocator(32<<20, "Debug.RollupKey")
defer alloc.Release()

kvs, err := pl.Rollup(alloc)
// Setting kvs at their original value as we can't give a new timestamp in debug mode.
kvs, err := pl.Rollup(alloc, math.MaxUint64)
x.Check(err)

wb := db.NewManagedWriteBatch()
Expand Down
5 changes: 3 additions & 2 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,9 @@ func (r *rebuilder) Run(ctx context.Context) error {
}
// No need to write a loop after ReadPostingList to skip unread entries
// for a given key because we only wrote BitDeltaPosting to temp badger.

kvs, err := l.Rollup(nil)
// We can write the data at their original timestamp in pstore badger.
// We do the rollup at MaxUint64 so that we don't change the timestamp of resulting list.
kvs, err := l.Rollup(nil, math.MaxUint64)
if err != nil {
return nil, err
}
Expand Down
33 changes: 31 additions & 2 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,10 +808,35 @@ func (l *List) Length(readTs, afterUid uint64) int {
// The first part of a multi-part list always has start UID 1 and will be the last part
// to be deleted, at which point the entire list will be marked for deletion.
// As the list grows, existing parts might be split if they become too big.
func (l *List) Rollup(alloc *z.Allocator) ([]*bpb.KV, error) {
//
// You can provide a readTs for Rollup. This should either be math.MaxUint64, or it should be a
// timestamp that was resevered for rollup. This would ensure that we read only till that time.
// If read ts is provided, Once the rollup is done, we check the maximum timestamp. We store the
// results at that max timestamp + 1. This mechanism allows us to make sure that
//
// - Since we write at max timestamp + 1, we can side step any issues that arise by wal replay.
//
// - Earlier one of the solution was to write at ts + 1. It didn't work as index transactions
// don't conflict so they can get commited at consecutive timestamps.
// This leads to some data being overwriten by rollup.
//
// - No other transcation happens at readTs. This way we can be sure that we won't overwrite
// any transaction that happened.
//
// - Latest data. We wait until readTs - 1, so that we know that we are reading the latest data.
// If we read stale data, it can cause to delete some old transactions.
//
// - Even though we have reserved readTs for rollup, we don't store the data there. This is done
// so that the rollup is written as close as possible to actual data. This can cause issues
// if someone is reading data between two timestamps.
//
// - Drop operation can cause issues if they are rolled up. Since we are storing results at ts + 1,
// in dgraph.drop.op. If some operations were done then, they would be overwriten.
// This won't happen as the transactions should conflict out.
func (l *List) Rollup(alloc *z.Allocator, readTs uint64) ([]*bpb.KV, error) {
l.RLock()
defer l.RUnlock()
out, err := l.rollup(math.MaxUint64, true)
out, err := l.rollup(readTs, true)
if err != nil {
return nil, errors.Wrapf(err, "failed when calling List.rollup")
}
Expand All @@ -823,6 +848,10 @@ func (l *List) Rollup(alloc *z.Allocator) ([]*bpb.KV, error) {
var kvs []*bpb.KV
kv := MarshalPostingList(out.plist, alloc)
kv.Version = out.newMinTs
if readTs != math.MaxUint64 {
kv.Version += 1
}

kv.Key = alloc.Copy(l.key)
kvs = append(kvs, kv)

Expand Down
67 changes: 50 additions & 17 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,39 @@ func TestAddMutation_mrjn1(t *testing.T) {
require.Equal(t, 0, ol.Length(txn.StartTs, 0))
}

func TestRollupMaxTsIsSet(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32

key := x.DataKey(x.GalaxyAttr("bal"), 1333)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
var commits int
N := int(1e6)
for i := 2; i <= N; i += 2 {
edge := &pb.DirectedEdge{
ValueId: uint64(i),
}
txn := Txn{StartTs: uint64(i)}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
if i%10000 == 0 {
// Do a rollup, otherwise, it gets too slow to add a million mutations to one posting
// list.
t.Logf("Start Ts: %d. Rolling up posting list.\n", txn.StartTs)
kvs, err := ol.Rollup(nil, uint64(N+i))
for _, kv := range kvs {
require.Equal(t, kv.Version, uint64(i)+2)
}
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
commits++
}
}

func TestMillion(t *testing.T) {
// Ensure list is stored in a single part.
defer setMaxListSize(maxListSize)
Expand All @@ -456,7 +489,7 @@ func TestMillion(t *testing.T) {
// Do a rollup, otherwise, it gets too slow to add a million mutations to one posting
// list.
t.Logf("Start Ts: %d. Rolling up posting list.\n", txn.StartTs)
kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand Down Expand Up @@ -890,7 +923,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) {
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
if i%2000 == 0 {
kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand All @@ -899,7 +932,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) {
commits++
}

kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
for _, kv := range kvs {
require.Equal(t, uint64(size+1), kv.Version)
Expand Down Expand Up @@ -933,7 +966,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
if i%2000 == 0 {
kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand All @@ -954,7 +987,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
addMutationHelper(t, ol, edge, Del, &txn)
require.NoError(t, ol.commitMutation(baseStartTs+uint64(i), baseStartTs+uint64(i)+1))
if i%2000 == 0 {
kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand Down Expand Up @@ -983,7 +1016,7 @@ func TestLargePlistSplit(t *testing.T) {
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
}
_, err = ol.Rollup(nil)
_, err = ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)

ol, err = getNew(key, ps, math.MaxUint64)
Expand All @@ -1002,7 +1035,7 @@ func TestLargePlistSplit(t *testing.T) {
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
}

kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand Down Expand Up @@ -1092,7 +1125,7 @@ func TestBinSplit(t *testing.T) {
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
}

kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
for _, kv := range kvs {
require.Equal(t, uint64(size+1), kv.Version)
Expand Down Expand Up @@ -1179,7 +1212,7 @@ func TestMultiPartListMarshal(t *testing.T) {
size := int(1e5)
ol, _ := createMultiPartList(t, size, false)

kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.Equal(t, len(kvs), len(ol.plist.Splits)+1)
require.NoError(t, writePostingListToDisk(kvs))
Expand Down Expand Up @@ -1207,7 +1240,7 @@ func TestMultiPartListWriteToDisk(t *testing.T) {
size := int(1e5)
originalList, commits := createMultiPartList(t, size, false)

kvs, err := originalList.Rollup(nil)
kvs, err := originalList.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.Equal(t, len(kvs), len(originalList.plist.Splits)+1)

Expand Down Expand Up @@ -1240,7 +1273,7 @@ func TestMultiPartListDelete(t *testing.T) {
}))
require.Equal(t, 0, counter)

kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.Equal(t, len(kvs), 1)

Expand Down Expand Up @@ -1272,7 +1305,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
if i%2000 == 0 {
kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand All @@ -1299,7 +1332,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
addMutationHelper(t, ol, edge, Del, &txn)
require.NoError(t, ol.commitMutation(baseStartTs+uint64(i), baseStartTs+uint64(i)+1))
if i%2000 == 0 {
kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand All @@ -1308,7 +1341,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
}

// Rollup list at the end of all the deletions.
kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand Down Expand Up @@ -1336,7 +1369,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
require.NoError(t, ol.commitMutation(baseStartTs+uint64(i), baseStartTs+uint64(i)+1))

if i%2000 == 0 {
kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand All @@ -1345,7 +1378,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
}

// Rollup list at the end of all the additions
kvs, err = ol.Rollup(nil)
kvs, err = ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand Down Expand Up @@ -1421,7 +1454,7 @@ func TestRecursiveSplits(t *testing.T) {
}

// Rollup the list. The final output should have more than two parts.
kvs, err := ol.Rollup(nil)
kvs, err := ol.Rollup(nil, math.MaxUint64)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand Down
39 changes: 35 additions & 4 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,17 @@ type pooledKeys struct {

// incrRollupi is used to batch keys for rollup incrementally.
type incrRollupi struct {
// We are using 2 priorities with now, idx 0 represents the high priority keys to be rolled up
// while idx 1 represents low priority keys to be rolled up.
// We are using 2 priorities with now, idx 0 represents the high priority keys to be rolled
// up while idx 1 represents low priority keys to be rolled up.
priorityKeys []*pooledKeys
count uint64

// Get Timestamp function gets a new timestamp to store the rollup at. This makes sure that
// we are not overwriting any transaction. If there are transactions that are ongoing,
// which modify the item, rollup wouldn't affect the data, as a delta would be written
// later on
getNewTs func(bool) uint64
closer *z.Closer
}

var (
Expand All @@ -58,6 +65,8 @@ var (
// ErrInvalidKey is returned when trying to read a posting list using
// an invalid key (e.g the key to a single part of a larger multi-part list).
ErrInvalidKey = errors.Errorf("cannot read posting list using multi-part list key")
// ErrHighPriorityOp is returned when rollup is cancelled so that operations could start.
ErrHighPriorityOp = errors.New("Cancelled rollup to make way for high priority operation")

// IncrRollup is used to batch keys for rollup incrementally.
IncrRollup = &incrRollupi{
Expand All @@ -81,12 +90,31 @@ func init() {

// rollUpKey takes the given key's posting lists, rolls it up and writes back to badger
func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
// Get a new non read only ts. This makes sure that no other txn would write at this
// ts, overwriting some data. Wait to read the Posting list until ts-1 have been applied
// to badger. This helps us prevent issues with wal replay, as we now have a timestamp
// where nothing was writen to dgraph.
ts := ir.getNewTs(false)

// Get a wait channel from oracle. Can't use WaitFromTs as we also need to check if other
// operations need to start. If ok is not true, that means we have already passed the ts,
// and we don't need to wait.
waitCh, ok := o.addToWaiters(ts)
if ok {
select {
case <-ir.closer.HasBeenClosed():
return ErrHighPriorityOp

case <-waitCh:
}
}

l, err := GetNoStore(key, math.MaxUint64)
if err != nil {
return err
}

kvs, err := l.Rollup(nil)
kvs, err := l.Rollup(nil, ts)
if err != nil {
return err
}
Expand Down Expand Up @@ -123,7 +151,10 @@ func (ir *incrRollupi) addKeyToBatch(key []byte, priority int) {
}

// Process will rollup batches of 64 keys in a go routine.
func (ir *incrRollupi) Process(closer *z.Closer) {
func (ir *incrRollupi) Process(closer *z.Closer, getNewTs func(bool) uint64) {
ir.getNewTs = getNewTs
ir.closer = closer

defer closer.Done()

writer := NewTxnWriter(pstore)
Expand Down
Loading

0 comments on commit f64e8a6

Please sign in to comment.