Skip to content

Commit

Permalink
Move checkpoint key to WAL (hypermodeinc#3444)
Browse files Browse the repository at this point in the history
The checkpoint key was being stored in p directory. Can't think of a reason why I did that. Keeping the checkpoint key in WAL makes natural sense and allows someone to drop the w directory, without causing issues later. One of the issues is that if w is dropped, Raft index would start from 1 again, but p directory would have stored the progress to a much higher index. This would cause all new proposals to not be applied.

With this PR, w directory can be dropped and the checkpoint would also get dropped along with it.
This PR also adds two optimizations:

Calculates checkpoints starting from the last checkpoint, instead of the first entry since snapshot.
Does not calculate snapshot if the checkpoint - first < SnapshotAfter.
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent 24916cf commit f44dbb0
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 94 deletions.
3 changes: 0 additions & 3 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,6 @@ func printKeys(db *badger.DB) {

// Don't use a switch case here. Because multiple of these can be true. In particular,
// IsSchema can be true alongside IsData.
if pk.IsRaft() {
buf.WriteString("{r}")
}
if pk.IsData() {
buf.WriteString("{d}")
}
Expand Down
6 changes: 6 additions & 0 deletions dgraph/cmd/debug/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ func printRaft(db *badger.DB, store *raftwal.DiskStorage) {
fmt.Printf("Hardstate: %+v\n", hs)
}

if chk, err := store.Checkpoint(); err != nil {
fmt.Printf("Got error while retrieving checkpoint: %v\n", err)
} else {
fmt.Printf("Checkpoint: %d\n", chk)
}

lastIdx, err := store.LastIndex()
if err != nil {
fmt.Printf("Got error while retrieving last index: %v\n", err)
Expand Down
87 changes: 64 additions & 23 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
"go.etcd.io/etcd/raft"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/raftpb"
"golang.org/x/net/trace"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage {

_, err = w.FirstIndex()
if err == errNotFound {
ents := make([]pb.Entry, 1)
ents := make([]raftpb.Entry, 1)
x.Check(w.reset(ents))
} else {
x.Check(err)
Expand Down Expand Up @@ -101,6 +102,14 @@ func (w *DiskStorage) HardStateKey() []byte {
return b
}

func (w *DiskStorage) CheckpointKey() []byte {
b := make([]byte, 14)
binary.BigEndian.PutUint64(b[0:8], w.id)
copy(b[8:10], []byte("ck"))
binary.BigEndian.PutUint32(b[10:14], w.gid)
return b
}

func (w *DiskStorage) EntryKey(idx uint64) []byte {
b := make([]byte, 20)
binary.BigEndian.PutUint64(b[0:8], w.id)
Expand Down Expand Up @@ -129,6 +138,38 @@ func (w *DiskStorage) StoreRaftId(id uint64) error {
})
}

func (w *DiskStorage) UpdateCheckpoint(snap *pb.Snapshot) error {
return w.db.Update(func(txn *badger.Txn) error {
data, err := snap.Marshal()
if err != nil {
return err
}
return txn.Set(w.CheckpointKey(), data)
})
}

func (w *DiskStorage) Checkpoint() (uint64, error) {
var applied uint64
err := w.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(w.CheckpointKey())
if err == badger.ErrKeyNotFound {
return nil
}
if err != nil {
return err
}
return item.Value(func(val []byte) error {
var snap pb.Snapshot
if err := snap.Unmarshal(val); err != nil {
return err
}
applied = snap.Index
return nil
})
})
return applied, err
}

// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
Expand All @@ -144,7 +185,7 @@ func (w *DiskStorage) Term(idx uint64) (uint64, error) {
return 0, raft.ErrCompacted
}

var e pb.Entry
var e raftpb.Entry
if _, err := w.seekEntry(&e, idx, false); err == errNotFound {
return 0, raft.ErrUnavailable
} else if err != nil {
Expand All @@ -158,7 +199,7 @@ func (w *DiskStorage) Term(idx uint64) (uint64, error) {

var errNotFound = errors.New("Unable to find raft entry")

func (w *DiskStorage) seekEntry(e *pb.Entry, seekTo uint64, reverse bool) (uint64, error) {
func (w *DiskStorage) seekEntry(e *raftpb.Entry, seekTo uint64, reverse bool) (uint64, error) {
var index uint64
err := w.db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
Expand Down Expand Up @@ -195,7 +236,7 @@ var (
// into the latest Snapshot).
func (w *DiskStorage) FirstIndex() (uint64, error) {
if val, ok := w.cache.Load(snapshotKey); ok {
snap, ok := val.(*pb.Snapshot)
snap, ok := val.(*raftpb.Snapshot)
if ok && !raft.IsEmptySnap(*snap) {
return snap.Metadata.Index + 1, nil
}
Expand Down Expand Up @@ -269,9 +310,9 @@ func (w *DiskStorage) deleteUntil(batch *badger.WriteBatch, until uint64) error
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
func (w *DiskStorage) Snapshot() (snap pb.Snapshot, rerr error) {
func (w *DiskStorage) Snapshot() (snap raftpb.Snapshot, rerr error) {
if val, ok := w.cache.Load(snapshotKey); ok {
snap, ok := val.(*pb.Snapshot)
snap, ok := val.(*raftpb.Snapshot)
if ok && !raft.IsEmptySnap(*snap) {
return *snap, nil
}
Expand All @@ -294,7 +335,7 @@ func (w *DiskStorage) Snapshot() (snap pb.Snapshot, rerr error) {
// setSnapshot would store the snapshot. We can delete all the entries up until the snapshot
// index. But, keep the raft entry at the snapshot index, to make it easier to build the logic; like
// the dummy entry in MemoryStorage.
func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error {
func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s raftpb.Snapshot) error {
if raft.IsEmptySnap(s) {
return nil
}
Expand All @@ -306,7 +347,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error
return err
}

e := pb.Entry{Term: s.Metadata.Term, Index: s.Metadata.Index}
e := raftpb.Entry{Term: s.Metadata.Term, Index: s.Metadata.Index}
data, err = e.Marshal()
if err != nil {
return err
Expand All @@ -330,7 +371,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error
}

// SetHardState saves the current HardState.
func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st pb.HardState) error {
func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st raftpb.HardState) error {
if raft.IsEmptyHardState(st) {
return nil
}
Expand All @@ -342,7 +383,7 @@ func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st pb.HardState) er
}

// reset resets the entries. Used for testing.
func (w *DiskStorage) reset(es []pb.Entry) error {
func (w *DiskStorage) reset(es []raftpb.Entry) error {
w.cache = new(sync.Map) // reset cache.

// Clean out the state.
Expand Down Expand Up @@ -402,7 +443,7 @@ func (w *DiskStorage) deleteFrom(batch *badger.WriteBatch, from uint64) error {
return w.deleteKeys(batch, keys)
}

func (w *DiskStorage) HardState() (hd pb.HardState, rerr error) {
func (w *DiskStorage) HardState() (hd raftpb.HardState, rerr error) {
w.elog.Printf("HardState")
defer w.elog.Printf("Done")
err := w.db.View(func(txn *badger.Txn) error {
Expand All @@ -421,14 +462,14 @@ func (w *DiskStorage) HardState() (hd pb.HardState, rerr error) {
}

// InitialState returns the saved HardState and ConfState information.
func (w *DiskStorage) InitialState() (hs pb.HardState, cs pb.ConfState, err error) {
func (w *DiskStorage) InitialState() (hs raftpb.HardState, cs raftpb.ConfState, err error) {
w.elog.Printf("InitialState")
defer w.elog.Printf("Done")
hs, err = w.HardState()
if err != nil {
return
}
var snap pb.Snapshot
var snap raftpb.Snapshot
snap, err = w.Snapshot()
if err != nil {
return
Expand All @@ -454,15 +495,15 @@ func (w *DiskStorage) NumEntries() (int, error) {
return count, err
}

func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error) {
func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []raftpb.Entry, rerr error) {
err := w.db.View(func(txn *badger.Txn) error {
if hi-lo == 1 { // We only need one entry.
item, err := txn.Get(w.EntryKey(lo))
if err != nil {
return err
}
return item.Value(func(val []byte) error {
var e pb.Entry
var e raftpb.Entry
if err = e.Unmarshal(val); err != nil {
return err
}
Expand All @@ -483,7 +524,7 @@ func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr er
first := true
for itr.Seek(start); itr.Valid(); itr.Next() {
item := itr.Item()
var e pb.Entry
var e raftpb.Entry
if err := item.Value(func(val []byte) error {
return e.Unmarshal(val)
}); err != nil {
Expand Down Expand Up @@ -511,7 +552,7 @@ func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr er
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error) {
func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []raftpb.Entry, rerr error) {
w.elog.Printf("Entries: [%d, %d) maxSize:%d", lo, hi, maxSize)
defer w.elog.Printf("Done")
first, err := w.FirstIndex()
Expand All @@ -533,7 +574,7 @@ func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error
return w.allEntries(lo, hi, maxSize)
}

func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) error {
func (w *DiskStorage) CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte) error {
glog.V(2).Infof("CreateSnapshot i=%d, cs=%+v", i, cs)
first, err := w.FirstIndex()
if err != nil {
Expand All @@ -544,15 +585,15 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er
return raft.ErrSnapOutOfDate
}

var e pb.Entry
var e raftpb.Entry
if _, err := w.seekEntry(&e, i, false); err != nil {
return err
}
if e.Index != i {
return errNotFound
}

var snap pb.Snapshot
var snap raftpb.Snapshot
snap.Metadata.Index = i
snap.Metadata.Term = e.Term
x.AssertTrue(cs != nil)
Expand All @@ -574,7 +615,7 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er
// first, then HardState and Snapshot if they are not empty. If persistent storage supports atomic
// writes then all of them can be written together. Note that when writing an Entry with Index i,
// any previously-persisted entries with Index >= i must be discarded.
func (w *DiskStorage) Save(h pb.HardState, es []pb.Entry, snap pb.Snapshot) error {
func (w *DiskStorage) Save(h raftpb.HardState, es []raftpb.Entry, snap raftpb.Snapshot) error {
batch := w.db.NewWriteBatch()
defer batch.Cancel()

Expand All @@ -591,7 +632,7 @@ func (w *DiskStorage) Save(h pb.HardState, es []pb.Entry, snap pb.Snapshot) erro
}

// Append the new entries to storage.
func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []pb.Entry) error {
func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []raftpb.Entry) error {
if len(entries) == 0 {
return nil
}
Expand Down
Loading

0 comments on commit f44dbb0

Please sign in to comment.