Skip to content

Commit

Permalink
Move checkpoint key to WAL (#3444)
Browse files Browse the repository at this point in the history
The checkpoint key was being stored in p directory. Can't think of a reason why I did that. Keeping the checkpoint key in WAL makes natural sense and allows someone to drop the w directory, without causing issues later. One of the issues is that if w is dropped, Raft index would start from 1 again, but p directory would have stored the progress to a much higher index. This would cause all new proposals to not be applied.

With this PR, w directory can be dropped and the checkpoint would also get dropped along with it.
This PR also adds two optimizations:

Calculates checkpoints starting from the last checkpoint, instead of the first entry since snapshot.
Does not calculate snapshot if the checkpoint - first < SnapshotAfter.
  • Loading branch information
manishrjain committed May 22, 2019
1 parent 457891f commit 50a973b
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 95 deletions.
3 changes: 0 additions & 3 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,6 @@ func printKeys(db *badger.DB) {

// Don't use a switch case here. Because multiple of these can be true. In particular,
// IsSchema can be true alongside IsData.
if pk.IsRaft() {
buf.WriteString("{r}")
}
if pk.IsData() {
buf.WriteString("{d}")
}
Expand Down
87 changes: 64 additions & 23 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
"go.etcd.io/etcd/raft"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/raftpb"
"golang.org/x/net/trace"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
)

Expand All @@ -55,7 +56,7 @@ func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage {

_, err = w.FirstIndex()
if err == errNotFound {
ents := make([]pb.Entry, 1)
ents := make([]raftpb.Entry, 1)
x.Check(w.reset(ents))
} else {
x.Check(err)
Expand Down Expand Up @@ -99,6 +100,14 @@ func (w *DiskStorage) hardStateKey() []byte {
return b
}

func (w *DiskStorage) CheckpointKey() []byte {
b := make([]byte, 14)
binary.BigEndian.PutUint64(b[0:8], w.id)
copy(b[8:10], []byte("ck"))
binary.BigEndian.PutUint32(b[10:14], w.gid)
return b
}

func (w *DiskStorage) entryKey(idx uint64) []byte {
b := make([]byte, 20)
binary.BigEndian.PutUint64(b[0:8], w.id)
Expand Down Expand Up @@ -127,6 +136,38 @@ func (w *DiskStorage) StoreRaftId(id uint64) error {
})
}

func (w *DiskStorage) UpdateCheckpoint(snap *pb.Snapshot) error {
return w.db.Update(func(txn *badger.Txn) error {
data, err := snap.Marshal()
if err != nil {
return err
}
return txn.Set(w.CheckpointKey(), data)
})
}

func (w *DiskStorage) Checkpoint() (uint64, error) {
var applied uint64
err := w.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(w.CheckpointKey())
if err == badger.ErrKeyNotFound {
return nil
}
if err != nil {
return err
}
return item.Value(func(val []byte) error {
var snap pb.Snapshot
if err := snap.Unmarshal(val); err != nil {
return err
}
applied = snap.Index
return nil
})
})
return applied, err
}

// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
Expand All @@ -142,7 +183,7 @@ func (w *DiskStorage) Term(idx uint64) (uint64, error) {
return 0, raft.ErrCompacted
}

var e pb.Entry
var e raftpb.Entry
if _, err := w.seekEntry(&e, idx, false); err == errNotFound {
return 0, raft.ErrUnavailable
} else if err != nil {
Expand All @@ -156,7 +197,7 @@ func (w *DiskStorage) Term(idx uint64) (uint64, error) {

var errNotFound = errors.New("Unable to find raft entry")

func (w *DiskStorage) seekEntry(e *pb.Entry, seekTo uint64, reverse bool) (uint64, error) {
func (w *DiskStorage) seekEntry(e *raftpb.Entry, seekTo uint64, reverse bool) (uint64, error) {
var index uint64
err := w.db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
Expand Down Expand Up @@ -193,7 +234,7 @@ var (
// into the latest Snapshot).
func (w *DiskStorage) FirstIndex() (uint64, error) {
if val, ok := w.cache.Load(snapshotKey); ok {
snap, ok := val.(*pb.Snapshot)
snap, ok := val.(*raftpb.Snapshot)
if ok && !raft.IsEmptySnap(*snap) {
return snap.Metadata.Index + 1, nil
}
Expand Down Expand Up @@ -267,9 +308,9 @@ func (w *DiskStorage) deleteUntil(batch *badger.WriteBatch, until uint64) error
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
func (w *DiskStorage) Snapshot() (snap pb.Snapshot, rerr error) {
func (w *DiskStorage) Snapshot() (snap raftpb.Snapshot, rerr error) {
if val, ok := w.cache.Load(snapshotKey); ok {
snap, ok := val.(*pb.Snapshot)
snap, ok := val.(*raftpb.Snapshot)
if ok && !raft.IsEmptySnap(*snap) {
return *snap, nil
}
Expand All @@ -292,7 +333,7 @@ func (w *DiskStorage) Snapshot() (snap pb.Snapshot, rerr error) {
// setSnapshot would store the snapshot. We can delete all the entries up until the snapshot
// index. But, keep the raft entry at the snapshot index, to make it easier to build the logic; like
// the dummy entry in MemoryStorage.
func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error {
func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s raftpb.Snapshot) error {
if raft.IsEmptySnap(s) {
return nil
}
Expand All @@ -304,7 +345,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error
return err
}

e := pb.Entry{Term: s.Metadata.Term, Index: s.Metadata.Index}
e := raftpb.Entry{Term: s.Metadata.Term, Index: s.Metadata.Index}
data, err = e.Marshal()
if err != nil {
return err
Expand All @@ -328,7 +369,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error
}

// SetHardState saves the current HardState.
func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st pb.HardState) error {
func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st raftpb.HardState) error {
if raft.IsEmptyHardState(st) {
return nil
}
Expand All @@ -340,7 +381,7 @@ func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st pb.HardState) er
}

// reset resets the entries. Used for testing.
func (w *DiskStorage) reset(es []pb.Entry) error {
func (w *DiskStorage) reset(es []raftpb.Entry) error {
w.cache = new(sync.Map) // reset cache.

// Clean out the state.
Expand Down Expand Up @@ -400,7 +441,7 @@ func (w *DiskStorage) deleteFrom(batch *badger.WriteBatch, from uint64) error {
return w.deleteKeys(batch, keys)
}

func (w *DiskStorage) HardState() (hd pb.HardState, rerr error) {
func (w *DiskStorage) HardState() (hd raftpb.HardState, rerr error) {
w.elog.Printf("HardState")
defer w.elog.Printf("Done")
err := w.db.View(func(txn *badger.Txn) error {
Expand All @@ -419,14 +460,14 @@ func (w *DiskStorage) HardState() (hd pb.HardState, rerr error) {
}

// InitialState returns the saved HardState and ConfState information.
func (w *DiskStorage) InitialState() (hs pb.HardState, cs pb.ConfState, err error) {
func (w *DiskStorage) InitialState() (hs raftpb.HardState, cs raftpb.ConfState, err error) {
w.elog.Printf("InitialState")
defer w.elog.Printf("Done")
hs, err = w.HardState()
if err != nil {
return
}
var snap pb.Snapshot
var snap raftpb.Snapshot
snap, err = w.Snapshot()
if err != nil {
return
Expand All @@ -452,15 +493,15 @@ func (w *DiskStorage) NumEntries() (int, error) {
return count, err
}

func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error) {
func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []raftpb.Entry, rerr error) {
err := w.db.View(func(txn *badger.Txn) error {
if hi-lo == 1 { // We only need one entry.
item, err := txn.Get(w.entryKey(lo))
if err != nil {
return err
}
return item.Value(func(val []byte) error {
var e pb.Entry
var e raftpb.Entry
if err = e.Unmarshal(val); err != nil {
return err
}
Expand All @@ -481,7 +522,7 @@ func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr er
first := true
for itr.Seek(start); itr.Valid(); itr.Next() {
item := itr.Item()
var e pb.Entry
var e raftpb.Entry
if err := item.Value(func(val []byte) error {
return e.Unmarshal(val)
}); err != nil {
Expand Down Expand Up @@ -509,7 +550,7 @@ func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr er
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error) {
func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []raftpb.Entry, rerr error) {
w.elog.Printf("Entries: [%d, %d) maxSize:%d", lo, hi, maxSize)
defer w.elog.Printf("Done")
first, err := w.FirstIndex()
Expand All @@ -531,7 +572,7 @@ func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error
return w.allEntries(lo, hi, maxSize)
}

func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) error {
func (w *DiskStorage) CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte) error {
glog.V(2).Infof("CreateSnapshot i=%d, cs=%+v", i, cs)
first, err := w.FirstIndex()
if err != nil {
Expand All @@ -542,15 +583,15 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er
return raft.ErrSnapOutOfDate
}

var e pb.Entry
var e raftpb.Entry
if _, err := w.seekEntry(&e, i, false); err != nil {
return err
}
if e.Index != i {
return errNotFound
}

var snap pb.Snapshot
var snap raftpb.Snapshot
snap.Metadata.Index = i
snap.Metadata.Term = e.Term
x.AssertTrue(cs != nil)
Expand All @@ -572,7 +613,7 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er
// first, then HardState and Snapshot if they are not empty. If persistent storage supports atomic
// writes then all of them can be written together. Note that when writing an Entry with Index i,
// any previously-persisted entries with Index >= i must be discarded.
func (w *DiskStorage) Save(h pb.HardState, es []pb.Entry, snap pb.Snapshot) error {
func (w *DiskStorage) Save(h raftpb.HardState, es []raftpb.Entry, snap raftpb.Snapshot) error {
batch := w.db.NewWriteBatch()
defer batch.Cancel()

Expand All @@ -589,7 +630,7 @@ func (w *DiskStorage) Save(h pb.HardState, es []pb.Entry, snap pb.Snapshot) erro
}

// Append the new entries to storage.
func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []pb.Entry) error {
func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []raftpb.Entry) error {
if len(entries) == 0 {
return nil
}
Expand Down
Loading

0 comments on commit 50a973b

Please sign in to comment.