Skip to content

Commit

Permalink
Fix a few issues: Use snapshot.Index for raft.Cfg.Applied. Do not ove…
Browse files Browse the repository at this point in the history
…rwrite any existing data when apply txn commits. Do not let CreateSnapshot fail.
  • Loading branch information
manishrjain committed Sep 22, 2018
1 parent 47dd180 commit a4bd06f
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 10 deletions.
8 changes: 6 additions & 2 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Node struct {
}

func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node {
first, err := store.FirstIndex()
snap, err := store.Snapshot()
x.Check(err)

n := &Node{
Expand Down Expand Up @@ -102,7 +102,11 @@ func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node {
// In case this is a new Raft log, first would be 1, and therefore
// Applied would be zero, hence meeting the condition by the library
// that Applied should only be set during a restart.
Applied: first - 1,
//
// Update: Set the Applied to the latest snapshot, because it seems
// like somehow the first index can be out of sync with the latest
// snapshot.
Applied: snap.Metadata.Index,
},
// processConfChange etc are not throttled so some extra delta, so that we don't
// block tick when applyCh is full
Expand Down
15 changes: 12 additions & 3 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,20 @@ func (n *node) applyCommitted(proposal *intern.Proposal, index uint64) error {
}
n.elog.Printf("Creating snapshot: %+v", snap)
glog.Infof("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs)
data, err := snap.Marshal()
x.Check(err)

// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(snap.ReadTs)
return n.Store.CreateSnapshot(snap.Index, n.ConfState(), data)

data, err := snap.Marshal()
x.Check(err)
for {
// We should never let CreateSnapshot have an error.
err := n.Store.CreateSnapshot(snap.Index, n.ConfState(), data)
if err == nil {
return nil
}
glog.Warningf("Error while calling CreateSnapshot: %v. Retrying...", err)
}

} else {
x.Fatalf("Unknown proposal")
Expand Down
23 changes: 18 additions & 5 deletions x/badger.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package x

import (
"math"
"sync"

"github.com/dgraph-io/badger"
"github.com/golang/glog"
)

type TxnWriter struct {
Expand All @@ -25,11 +25,24 @@ func (w *TxnWriter) cb(err error) {
}

func (w *TxnWriter) SetAt(key, val []byte, meta byte, ts uint64) error {
txn := w.DB.NewTransactionAt(math.MaxUint64, true)
txn := w.DB.NewTransactionAt(ts, true)
defer txn.Discard()
// TODO: We should probably do a Get to ensure that we don't end up
// overwriting an already existing value at that ts, which might be there
// due to a previous rollup event.

// We do a Get to ensure that we don't end up overwriting an already
// existing delta or state at the ts.
if item, err := txn.Get(key); err == badger.ErrKeyNotFound {
// pass
} else if err != nil {
return err

} else if item.Version() == ts {
// Found an existing value there. So, skip writing.
if glog.V(2) {
pk := Parse(key)
glog.Warning("Skipping write to key: %+v. Found existing version at: %d", pk, ts)
}
return nil
}
if err := txn.SetWithMeta(key, val, meta); err != nil {
return err
}
Expand Down

0 comments on commit a4bd06f

Please sign in to comment.