Skip to content

Commit

Permalink
fix(Alpha): Immediately take a snapshot if we don't have one (#6458) (#…
Browse files Browse the repository at this point in the history
…6970)

This PR would cause the Alpha leader to immediately trigger a snapshot creation if it doesn't already have one.

This is useful when you bring a new cluster from bulk loader. If you remove an Alpha instance and add a new Alpha instance, the new follower won't get a snapshot if the leader doesn't have one. This PR avoids that by causing the leader to get a snapshot asap.

Fixes DGRAPH-2445

(cherry picked from commit 6bd7f8e)

Co-authored-by: Manish R Jain <[email protected]>
  • Loading branch information
OmarAyo and manishrjain authored Nov 24, 2020
1 parent 9e15123 commit e965dd8
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,12 +933,22 @@ func (n *node) checkpointAndClose(done chan struct{}) {
}

if n.AmLeader() {
var calculate bool
// If leader doesn't have a snapshot, we should create one immediately. This is very
// useful when you bring up the cluster from bulk loader. If you remove an alpha and
// add a new alpha, the new follower won't get a snapshot if the leader doesn't have
// one.
snap, err := n.Store.Snapshot()
if err != nil {
glog.Errorf("While retrieving snapshot from Store: %v\n", err)
continue
}
calculate := raft.IsEmptySnap(snap) // If no snapshot, then calculate one immediately.

if chk, err := n.Store.Checkpoint(); err == nil {
if first, err := n.Store.FirstIndex(); err == nil {
// Save some cycles by only calculating snapshot if the checkpoint has gone
// quite a bit further than the first index.
calculate = chk >= first+uint64(x.WorkerConfig.SnapshotAfter)
calculate = calculate || chk >= first+uint64(x.WorkerConfig.SnapshotAfter)
glog.V(3).Infof("Evaluating snapshot first:%d chk:%d (chk-first:%d) "+
"snapshotAfter:%d snap:%v", first, chk, chk-first,
x.WorkerConfig.SnapshotAfter, calculate)
Expand All @@ -956,7 +966,10 @@ func (n *node) checkpointAndClose(done chan struct{}) {
// snapshotting. We just need to do enough, so that we don't have a huge backlog of
// entries to process on a restart.
if calculate {
if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil {
// We can set discardN argument to zero, because we already know that calculate
// would be true if either we absolutely needed to calculate the snapshot,
// or our checkpoint already crossed the SnapshotAfter threshold.
if err := n.proposeSnapshot(0); err != nil {
glog.Errorf("While calculating and proposing snapshot: %v", err)
}
}
Expand Down

0 comments on commit e965dd8

Please sign in to comment.