Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OPTIMIZATION] Optimize snapshot creation #4901

Merged
merged 28 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1bb3475
Improve CreateSnapshot()
ashish-goswami Mar 5, 2020
ef4f46d
Temp
ashish-goswami Mar 6, 2020
0b4e96f
Temp2
ashish-goswami Mar 8, 2020
ff70fa1
Refactor
ashish-goswami Mar 9, 2020
8f4160e
Remove defer
ashish-goswami Mar 10, 2020
b01a1ec
Fix FirstIndex
ashish-goswami Mar 10, 2020
27ad2ff
Fix NumEntries()
ashish-goswami Mar 10, 2020
42c1b0d
Minor changes
ashish-goswami Mar 10, 2020
3ff5f9e
Remove changes from draft.go
ashish-goswami Mar 10, 2020
d90b255
Minor fix
ashish-goswami Mar 10, 2020
4a65e7f
Delete entries before first at start
ashish-goswami Mar 11, 2020
a7aa99d
Minor fix
ashish-goswami Mar 12, 2020
ae8b9be
Address some review comments
ashish-goswami Mar 13, 2020
887ab67
Temp
ashish-goswami Mar 17, 2020
8acc9be
Fix broken tests
ashish-goswami Mar 17, 2020
e77b238
Refactor code
ashish-goswami Mar 18, 2020
71c0d96
Minor change
ashish-goswami Mar 18, 2020
81d1d3f
Address review comments
ashish-goswami Mar 18, 2020
e4e46f8
Address review comment
ashish-goswami Mar 19, 2020
0be3eca
Merge remote-tracking branch 'origin/master' into ashish/snapshot
ashish-goswami Mar 19, 2020
964d32d
Merge remote-tracking branch 'origin/master' into ashish/snapshot
ashish-goswami Mar 20, 2020
abd0ad1
docs: Add Releases page. (#4954)
danielmai Mar 20, 2020
c15189f
docs: Set v1.2.2 as latest docs. (#4993)
danielmai Mar 20, 2020
cf43770
Add comment regarding stream IDs for split lists in bulk loader. (#4982)
martinmr Mar 20, 2020
ca86bb6
Update CHANGELOG for v20.03.0-beta.20200320. (#4994)
lgalatin Mar 20, 2020
8269220
Merge remote-tracking branch 'origin/master' into ashish/snapshot
ashish-goswami Mar 21, 2020
233ba82
Minor change
ashish-goswami Mar 23, 2020
2ff7547
Merge remote-tracking branch 'origin/master' into ashish/snapshot
ashish-goswami Apr 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conn/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestProposal(t *testing.T) {
db, err := badger.Open(badger.DefaultOptions(dir))
require.NoError(t, err)
store := raftwal.Init(db, 0, 0)
defer store.Closer.SignalAndWait()

rc := &pb.RaftContext{Id: 1}
n := NewNode(rc, store)
Expand Down
5 changes: 4 additions & 1 deletion dgraph/cmd/debug/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,14 @@ func handleWal(db *badger.DB) error {
store := raftwal.Init(db, rid, gid)
switch {
case len(opt.wsetSnapshot) > 0:
return overwriteSnapshot(db, store)
err := overwriteSnapshot(db, store)
store.Closer.SignalAndWait()
return err

default:
printRaft(db, store)
}
store.Closer.SignalAndWait()
}
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ func run() {
_ = httpListener.Close()
// Stop Raft.
st.node.closer.SignalAndWait()
// Stop Raft store.
store.Closer.SignalAndWait()
// Stop all internal requests.
_ = grpcListener.Close()
st.node.trySnapshot(0)
Expand Down
97 changes: 81 additions & 16 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/gogo/protobuf/proto"
Expand All @@ -40,15 +41,30 @@ type DiskStorage struct {
gid uint32
elog trace.EventLog

cache *sync.Map
cache *sync.Map
Closer *y.Closer
indexRangeChan chan indexRange
}

type indexRange struct {
from, until uint64 // index range for deletion, until index is not deleted.
}

// Init initializes returns a properly initialized instance of DiskStorage.
// To gracefully shutdown DiskStorage, store.Closer.SignalAndWait() should be called.
func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage {
w := &DiskStorage{db: db, id: id, gid: gid, cache: new(sync.Map)}
w := &DiskStorage{db: db,
id: id,
gid: gid,
cache: new(sync.Map),
Closer: y.NewCloser(1),
indexRangeChan: make(chan indexRange, 16),
}
if prev, err := RaftId(db); err != nil || prev != id {
x.Check(w.StoreRaftId(id))
}
go w.processIndexRange()

w.elog = trace.NewEventLog("Badger", "RaftStorage")

snap, err := w.Snapshot()
Expand All @@ -57,16 +73,54 @@ func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage {
return w
}

_, err = w.FirstIndex()
first, err := w.FirstIndex()
if err == errNotFound {
ents := make([]raftpb.Entry, 1)
x.Check(w.reset(ents))
} else {
x.Check(err)
}

// If db is not closed properly, there might be index ranges for which delete entries are not
// inserted. So insert delete entries for those ranges starting from 0 to (first-1).
w.indexRangeChan <- indexRange{0, first - 1}

return w
}

func (w *DiskStorage) processIndexRange() {
defer w.Closer.Done()

processSingleRange := func(r indexRange) {
batch := w.db.NewWriteBatch()
if err := w.deleteRange(batch, r.from, r.until); err != nil {
glog.Errorf("deleteRange failed with error: %v, from: %d, until: %d\n",
err, r.from, r.until)
}
if err := batch.Flush(); err != nil {
glog.Errorf("processDeleteRange batch flush failed with error: %v,\n", err)
}
}

loop:
for {
select {
case r := <-w.indexRangeChan:
processSingleRange(r)
case <-w.Closer.HasBeenClosed():
break loop
}
}

// As we have already shutdown the node, it is safe to close indexRangeChan.
// node.processApplyChan() calls CreateSnapshot, which internally sends values on this chan.
close(w.indexRangeChan)

for r := range w.indexRangeChan {
processSingleRange(r)
}
}

var idKey = []byte("raftid")

// RaftId reads the given badger store and returns the stored RAFT ID.
Expand Down Expand Up @@ -243,12 +297,13 @@ var (
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot).
func (w *DiskStorage) FirstIndex() (uint64, error) {
if val, ok := w.cache.Load(snapshotKey); ok {
snap, ok := val.(*raftpb.Snapshot)
if ok && !raft.IsEmptySnap(*snap) {
return snap.Metadata.Index + 1, nil
}
// We are deleting index ranges in background after taking snapshot, so we should check for last
// snapshot in WAL(Badger) if it is not found in cache. If no snapshot is found, then we can
// check firstKey.
if snap, err := w.Snapshot(); err == nil && !raft.IsEmptySnap(snap) {
return snap.Metadata.Index + 1, nil
}

if val, ok := w.cache.Load(firstKey); ok {
if first, ok := val.(uint64); ok {
return first, nil
Expand Down Expand Up @@ -276,11 +331,11 @@ func (w *DiskStorage) LastIndex() (uint64, error) {
return w.seekEntry(nil, math.MaxUint64, true)
}

// Delete all entries from [0, until), i.e. excluding until.
// Delete all entries from [from, until), i.e. excluding until.
// Keep the entry at the snapshot index, for simplification of logic.
// It is the application's responsibility to not attempt to deleteUntil an index
// It is the application's responsibility to not attempt to deleteRange an index
// greater than raftLog.applied.
func (w *DiskStorage) deleteUntil(batch *badger.WriteBatch, until uint64) error {
func (w *DiskStorage) deleteRange(batch *badger.WriteBatch, from, until uint64) error {
var keys []string
err := w.db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
Expand All @@ -289,7 +344,7 @@ func (w *DiskStorage) deleteUntil(batch *badger.WriteBatch, until uint64) error
itr := txn.NewIterator(opt)
defer itr.Close()

start := w.EntryKey(0)
start := w.EntryKey(from)
first := true
var index uint64
for itr.Seek(start); itr.Valid(); itr.Next() {
Expand Down Expand Up @@ -488,15 +543,19 @@ func (w *DiskStorage) InitialState() (hs raftpb.HardState, cs raftpb.ConfState,

// NumEntries returns the number of entries in the write-ahead log.
func (w *DiskStorage) NumEntries() (int, error) {
first, err := w.FirstIndex()
if err != nil {
return 0, err
}
var count int
err := w.db.View(func(txn *badger.Txn) error {
err = w.db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = false
opt.Prefix = w.entryPrefix()
itr := txn.NewIterator(opt)
defer itr.Close()

start := w.EntryKey(0)
start := w.EntryKey(first)
for itr.Seek(start); itr.Valid(); itr.Next() {
count++
}
Expand Down Expand Up @@ -622,10 +681,16 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte
if err := w.setSnapshot(batch, &snap); err != nil {
return err
}
if err := w.deleteUntil(batch, snap.Metadata.Index); err != nil {

if err := batch.Flush(); err != nil {
return err
}
return batch.Flush()

// deleteRange deletes all entries in the range except the last one(which is SnapshotIndex) and
// first index is last snapshotIndex+1, hence start index for indexRange should be (first-1).
// TODO: If deleteRangeChan is full, it might block mutations.
w.indexRangeChan <- indexRange{first - 1, snap.Metadata.Index}
return nil
}

// Save would write Entries, HardState and Snapshot to persistent storage in order, i.e. Entries
Expand Down
13 changes: 11 additions & 2 deletions raftwal/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestStorageTerm(t *testing.T) {
db, err := badger.Open(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()

ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
tests := []struct {
Expand Down Expand Up @@ -103,6 +104,7 @@ func TestStorageEntries(t *testing.T) {
db, err := badger.Open(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()

ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}
tests := []struct {
Expand Down Expand Up @@ -148,6 +150,7 @@ func TestStorageLastIndex(t *testing.T) {
db, err := badger.Open(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()

ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
require.NoError(t, ds.reset(ents))
Expand Down Expand Up @@ -178,6 +181,7 @@ func TestStorageFirstIndex(t *testing.T) {
db, err := badger.Open(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()

ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
require.NoError(t, ds.reset(ents))
Expand All @@ -191,7 +195,7 @@ func TestStorageFirstIndex(t *testing.T) {
}

batch := db.NewWriteBatch()
require.NoError(t, ds.deleteUntil(batch, 4))
require.NoError(t, ds.deleteRange(batch, 0, 4))
require.NoError(t, batch.Flush())
ds.cache.Store(firstKey, 0)
first, err = ds.FirstIndex()
Expand All @@ -211,6 +215,7 @@ func TestStorageCompact(t *testing.T) {
db, err := badger.Open(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()

ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
require.NoError(t, ds.reset(ents))
Expand All @@ -230,8 +235,10 @@ func TestStorageCompact(t *testing.T) {
}

for i, tt := range tests {
first, err := ds.FirstIndex()
require.NoError(t, err)
batch := db.NewWriteBatch()
err := ds.deleteUntil(batch, tt.i)
err = ds.deleteRange(batch, first-1, tt.i)
require.NoError(t, batch.Flush())
if err != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
Expand Down Expand Up @@ -260,6 +267,7 @@ func TestStorageCreateSnapshot(t *testing.T) {
db, err := badger.Open(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()

ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
cs := &pb.ConfState{Nodes: []uint64{1, 2, 3}}
Expand Down Expand Up @@ -297,6 +305,7 @@ func TestStorageAppend(t *testing.T) {
db, err := badger.Open(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()

ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
tests := []struct {
Expand Down
1 change: 1 addition & 0 deletions worker/draft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestCalculateSnapshot(t *testing.T) {
db, err := openBadger(dir)
require.NoError(t, err)
ds := raftwal.Init(db, 0, 0)
defer ds.Closer.SignalAndWait()

n := newNode(ds, 1, 1, "")
var entries []raftpb.Entry
Expand Down
3 changes: 3 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func BlockingStop() {
glog.Infof("Stopping node...")
groups().Node.closer.SignalAndWait()

glog.Infof("Stopping raftwal store...")
groups().Node.Store.Closer.SignalAndWait()

glog.Infof("Stopping worker server...")
workerServer.Stop()
}
Expand Down