Skip to content

Commit

Permalink
raft: fix correctness bug in CommittedEntries pagination
Browse files Browse the repository at this point in the history
In etcd-io#9982, a mechanism to limit the size of `CommittedEntries` was
introduced. The way this mechanism works was that it would load
applicable entries (passing the max size hint) and would emit a
`HardState` whose commit index was truncated to match the limitation
applied to the entries. Unfortunately, this was subtly incorrect
when the user-provided `Entries` implementation didn't exactly
match what Raft uses internally. Depending on whether a `Node` or
a `RawNode` was used, this would either lead to regressing the
HardState's commit index or outright forgetting to apply entries,
respectively.

Asking implementers to precisely match the Raft size limitation
semantics was considered but looks like a bad idea as it puts
correctness squarely in the hands of downstream users. Instead, this
PR removes the truncation of `HardState` when limiting is active
and tracks the applied index separately. This removes the old
paradigm (that the previous code tried to work around) that the
client will always apply all the way to the commit index, which
isn't true when commit entries are paginated.

See [1] for more on the discovery of this bug (CockroachDB's
implementation of `Entries` returns one more entry than Raft's when the
size limit hits).

[1]: cockroachdb/cockroach#28918 (comment)
  • Loading branch information
tbg committed Sep 4, 2018
1 parent 6143c13 commit b844bdd
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 22 deletions.
31 changes: 20 additions & 11 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ func (rd Ready) containsUpdates() bool {
len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
}

// appliedCursor extracts from the Ready the highest index the client has
// applied (once the Ready is confirmed via Advance). If no information is
// contained in the Ready, returns zero.
func (rd Ready) appliedCursor() uint64 {
if n := len(rd.CommittedEntries); n > 0 {
return rd.CommittedEntries[n-1].Index
}
if index := rd.Snapshot.Metadata.Index; index > 0 {
return index
}
return 0
}

// Node represents a node in a raft cluster.
type Node interface {
// Tick increments the internal logical clock for the Node by a single tick. Election
Expand Down Expand Up @@ -282,6 +295,7 @@ func (n *node) run(r *raft) {
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var applyingToI uint64
var rd Ready

lead := None
Expand Down Expand Up @@ -381,13 +395,17 @@ func (n *node) run(r *raft) {
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
if index := rd.appliedCursor(); index != 0 {
applyingToI = index
}

r.msgs = nil
r.readStates = nil
advancec = n.advancec
case <-advancec:
if prevHardSt.Commit != 0 {
r.raftLog.appliedTo(prevHardSt.Commit)
if applyingToI != 0 {
r.raftLog.appliedTo(applyingToI)
applyingToI = 0
}
if havePrevLastUnstablei {
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
Expand Down Expand Up @@ -559,15 +577,6 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
}
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
// If we hit a size limit when loadaing CommittedEntries, clamp
// our HardState.Commit to what we're actually returning. This is
// also used as our cursor to resume for the next Ready batch.
if len(rd.CommittedEntries) > 0 {
lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1]
if rd.HardState.Commit > lastCommit.Index {
rd.HardState.Commit = lastCommit.Index
}
}
}
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
Expand Down
71 changes: 71 additions & 0 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package raft
import (
"bytes"
"context"
"fmt"
"math"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -926,3 +928,72 @@ func TestCommitPagination(t *testing.T) {
s.Append(rd.Entries)
n.Advance()
}

type ignoreSizeHintMemStorage struct {
*MemoryStorage
}

func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raftpb.Entry, error) {
return s.MemoryStorage.Entries(lo, hi, math.MaxUint64)
}

// TestNodeCommitPaginationAfterRestart regressino tests a scenario in which the
// Storage's Entries size limitation is slightly more permissive than Raft's
// internal one. The original bug was the following:
//
// - node learns that index 11 (or 100, doesn't matter) is committed
// - nextEnts returns index 1..10 in CommittedEntries due to size limiting. However,
// index 10 already exceeds maxBytes, due to a user-provided impl of Entries.
// - Commit index gets bumped to 10
// - the node persists the HardState, but crashes before applying the entries
// - upon restart, the storage returns the same entries, but `slice` takes a different code path
// (since it is now called with an upper bound of 10) and removes the last entry.
// - Raft emits a HardState with a regressing commit index.
//
// A simpler version of this test would have the storage return a lot less entries than dictated
// by maxSize (for example, exactly one entry) after the restart, resulting in a larger regression.
// This wouldn't need to exploit anything about Raft-internal code paths to fail.
func TestNodeCommitPaginationAfterRestart(t *testing.T) {
s := &ignoreSizeHintMemStorage{
MemoryStorage: NewMemoryStorage(),
}
persistedHardState := raftpb.HardState{
Term: 1,
Vote: 1,
Commit: 10,
}

s.hardState = persistedHardState
s.ents = make([]raftpb.Entry, 10)
var size uint64
for i := range s.ents {
ent := raftpb.Entry{
Term: 1,
Index: uint64(i + 1),
Type: raftpb.EntryNormal,
Data: []byte("a"),
}

s.ents[i] = ent
size += uint64(ent.Size())
}

cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
// Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
// not be included in the initial rd.CommittedEntries. However, our storage will ignore
// this and *will* return it (which is how the Commit index ended up being 10 initially).
cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1

r := newRaft(cfg)
n := newNode()
go n.run(r)
defer n.Stop()

rd := readyWithTimeout(&n)
if !IsEmptyHardState(rd.HardState) && rd.HardState.Commit < persistedHardState.Commit {
t.Errorf("HardState regressed: Commit %d -> %d\nCommitting:\n%+v",
persistedHardState.Commit, rd.HardState.Commit,
DescribeEntries(rd.CommittedEntries, func(data []byte) string { return fmt.Sprintf("%q", data) }),
)
}
}
19 changes: 8 additions & 11 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,15 @@ func (rn *RawNode) commitReady(rd Ready) {
if !IsEmptyHardState(rd.HardState) {
rn.prevHardSt = rd.HardState
}
if rn.prevHardSt.Commit != 0 {
// In most cases, prevHardSt and rd.HardState will be the same
// because when there are new entries to apply we just sent a
// HardState with an updated Commit value. However, on initial
// startup the two are different because we don't send a HardState
// until something changes, but we do send any un-applied but
// committed entries (and previously-committed entries may be
// incorporated into the snapshot, even if rd.CommittedEntries is
// empty). Therefore we mark all committed entries as applied
// whether they were included in rd.HardState or not.
rn.raft.raftLog.appliedTo(rn.prevHardSt.Commit)

// If entries were applied (or a snapshot), update our cursor for
// the next Ready. Note that if the current HardState contains a
// new Commit index, this does not mean that we're also applying
// all of the new entries due to commit pagination by size.
if index := rd.appliedCursor(); index > 0 {
rn.raft.raftLog.appliedTo(index)
}

if len(rd.Entries) > 0 {
e := rd.Entries[len(rd.Entries)-1]
rn.raft.raftLog.stableTo(e.Index, e.Term)
Expand Down
77 changes: 77 additions & 0 deletions raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,80 @@ func TestRawNodeStatus(t *testing.T) {
t.Errorf("expected status struct, got nil")
}
}

// TestRawNodeCommitPaginationAfterRestart is the RawNode version of TestNodeCommitPaginationAfterRestart.
// The anomaly here was even worse as the Raft group would forget to apply entries:
//
// - node learns that index 11 is committed
// - nextEnts returns index 1..10 in CommittedEntries (but index 10 already exceeds maxBytes), which
// isn't noticed internally by Raft
// - Commit index gets bumped to 10
// - the node persists the HardState, but crashes before applying the entries
// - upon restart, the storage returns the same entries, but `slice` takes a different code path
// and removes the last entry.
// - Raft does not emit a HardState, but when the app calls Advance(), it bumps its internal applied
// index cursor to 10 (when it should be 9)
// - the next Ready asks the app to apply index 11 (omitting index 10), losing a write.
func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
s := &ignoreSizeHintMemStorage{
MemoryStorage: NewMemoryStorage(),
}
persistedHardState := raftpb.HardState{
Term: 1,
Vote: 1,
Commit: 10,
}

s.hardState = persistedHardState
s.ents = make([]raftpb.Entry, 10)
var size uint64
for i := range s.ents {
ent := raftpb.Entry{
Term: 1,
Index: uint64(i + 1),
Type: raftpb.EntryNormal,
Data: []byte("a"),
}

s.ents[i] = ent
size += uint64(ent.Size())
}

cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
// Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
// not be included in the initial rd.CommittedEntries. However, our storage will ignore
// this and *will* return it (which is how the Commit index ended up being 10 initially).
cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1

s.ents = append(s.ents, raftpb.Entry{
Term: 1,
Index: uint64(11),
Type: raftpb.EntryNormal,
Data: []byte("boom"),
})

rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
if err != nil {
t.Fatal(err)
}

for highestApplied := uint64(0); highestApplied != 11; {
rd := rawNode.Ready()
n := len(rd.CommittedEntries)
if n == 0 {
t.Fatalf("stopped applying entries at index %d", highestApplied)
}
if next := rd.CommittedEntries[0].Index; highestApplied != 0 && highestApplied+1 != next {
t.Fatalf("attempting to apply index %d after index %d, leaving a gap", next, highestApplied)
}
highestApplied = rd.CommittedEntries[n-1].Index
rawNode.Advance(rd)
rawNode.Step(raftpb.Message{
Type: raftpb.MsgHeartbeat,
To: 1,
From: 1, // illegal, but we get away with it
Term: 1,
Commit: 11,
})
}
}
10 changes: 10 additions & 0 deletions raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ func DescribeEntry(e pb.Entry, f EntryFormatter) string {
return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted)
}

// DescribeEntries calls DescribeEntry for each Entry, adding a newline to
// each.
func DescribeEntries(ents []pb.Entry, f EntryFormatter) string {
var buf bytes.Buffer
for _, e := range ents {
_, _ = buf.WriteString(DescribeEntry(e, f) + "\n")
}
return buf.String()
}

func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry {
if len(ents) == 0 {
return ents
Expand Down

0 comments on commit b844bdd

Please sign in to comment.