Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/norace_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3606,7 +3606,7 @@ func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ae, err := node.decodeAppendEntry(sm.msg, nil, _EMPTY_)
ae, err := decodeAppendEntry(sm.msg, nil, _EMPTY_)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2424,7 +2424,7 @@ func (ae *appendEntry) encode(b []byte) ([]byte, error) {
}

// This can not be used post the wire level callback since we do not copy.
func (n *raft) decodeAppendEntry(msg []byte, sub *subscription, reply string) (*appendEntry, error) {
func decodeAppendEntry(msg []byte, sub *subscription, reply string) (*appendEntry, error) {
if len(msg) < appendEntryBaseLen {
return nil, errBadAppendEntry
}
Expand Down Expand Up @@ -2509,7 +2509,7 @@ func (ar *appendEntryResponse) encode(b []byte) []byte {
// Track all peers we may have ever seen to use an string interns for appendEntryResponse decoding.
var peers sync.Map

func (n *raft) decodeAppendEntryResponse(msg []byte) *appendEntryResponse {
func decodeAppendEntryResponse(msg []byte) *appendEntryResponse {
if len(msg) != appendEntryResponseLen {
return nil
}
Expand Down Expand Up @@ -2972,7 +2972,7 @@ func (n *raft) loadEntry(index uint64) (*appendEntry, error) {
if err != nil {
return nil, err
}
return n.decodeAppendEntry(sm.msg, nil, _EMPTY_)
return decodeAppendEntry(sm.msg, nil, _EMPTY_)
}

// applyCommit will update our commit index and apply the entry to the apply queue.
Expand Down Expand Up @@ -3302,7 +3302,7 @@ func (n *raft) runAsCandidate() {
// is an internal callback from the "asubj" append entry subscription.
func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, _, reply string, msg []byte) {
msg = copyBytes(msg)
if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil {
if ae, err := decodeAppendEntry(msg, sub, reply); err == nil {
// Push to the new entry channel. From here one of the worker
// goroutines (runAsLeader, runAsFollower, runAsCandidate) will
// pick it up.
Expand Down Expand Up @@ -3894,7 +3894,7 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) {

// handleAppendEntryResponse processes responses to append entries.
func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
ar := n.decodeAppendEntryResponse(msg)
ar := decodeAppendEntryResponse(msg)
ar.reply = reply
n.resp.push(ar)
}
Expand Down
28 changes: 13 additions & 15 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,8 @@ func TestNRGAppendEntryDecode(t *testing.T) {
require_NoError(t, err)

// Truncate buffer first.
var node *raft
short := buf[0 : len(buf)-1025]
_, err = node.decodeAppendEntry(short, nil, _EMPTY_)
_, err = decodeAppendEntry(short, nil, _EMPTY_)
require_Error(t, err, errBadAppendEntry)

for i := 0; i < 100; i++ {
Expand All @@ -144,7 +143,7 @@ func TestNRGAppendEntryDecode(t *testing.T) {
bi := 42 + rand.Intn(len(b)-42)
if b[bi] != 0 && bi != 40 {
b[bi] = 0
_, err = node.decodeAppendEntry(b, nil, _EMPTY_)
_, err = decodeAppendEntry(b, nil, _EMPTY_)
require_Error(t, err, errBadAppendEntry)
}
}
Expand Down Expand Up @@ -279,7 +278,7 @@ func TestNRGAEFromOldLeader(t *testing.T) {
require_NoError(t, err)

// Wait for the response, the server should have rejected it.
ar := leader.decodeAppendEntryResponse(resp.Data)
ar := decodeAppendEntryResponse(resp.Data)
require_NotNil(t, ar)
require_Equal(t, ar.success, false)

Expand Down Expand Up @@ -400,8 +399,7 @@ func TestNRGLeaderTransfer(t *testing.T) {
return err
}

leader := newLeader.node().(*raft)
ae, err := leader.decodeAppendEntry(msg.Data, nil, msg.Reply)
ae, err := decodeAppendEntry(msg.Data, nil, msg.Reply)
if err != nil {
return err
}
Expand Down Expand Up @@ -473,7 +471,7 @@ func TestNRGStepDownOnSameTermDoesntClearVote(t *testing.T) {

// We're going to modify the append entry that we received so that
// we can send it again with modifications.
ae, err := leader.decodeAppendEntry(msg.Data, nil, msg.Reply)
ae, err := decodeAppendEntry(msg.Data, nil, msg.Reply)
require_NoError(t, err)

// First of all we're going to try sending an append entry that
Expand Down Expand Up @@ -931,7 +929,7 @@ func TestNRGNoResetOnAppendEntryResponse(t *testing.T) {
// The higher term in this case is what would cause the leader previously
// to reset the entire log which it shouldn't do.
_, err := nc.Subscribe(fmt.Sprintf(raftAppendSubj, "TEST"), func(msg *nats.Msg) {
if ae, err := follower.decodeAppendEntry(msg.Data, nil, msg.Reply); err == nil && len(ae.entries) > 0 {
if ae, err := decodeAppendEntry(msg.Data, nil, msg.Reply); err == nil && len(ae.entries) > 0 {
ar := newAppendEntryResponse(ae.term+1, ae.commit, follower.id, false)
require_NoError(t, msg.Respond(ar.encode(nil)))
}
Expand Down Expand Up @@ -1023,7 +1021,7 @@ func TestNRGCandidateDontStepdownDueToLeaderOfPreviousTerm(t *testing.T) {
defer rg.unlockAll()

// Decode the append entry
ae, err := leader.decodeAppendEntry(msg.Data, nil, msg.Reply)
ae, err := decodeAppendEntry(msg.Data, nil, msg.Reply)
require_NoError(t, err)

// Check that the append entry is from the leader
Expand Down Expand Up @@ -2434,7 +2432,7 @@ func TestNRGCatchupDontCountTowardQuorum(t *testing.T) {
// Should reply we require catchup.
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
ar := n.decodeAppendEntryResponse(msg.Data)
ar := decodeAppendEntryResponse(msg.Data)
require_Equal(t, ar.index, 0)
require_False(t, ar.success)
require_True(t, strings.HasPrefix(msg.Reply, "$NRG.CR"))
Expand All @@ -2452,7 +2450,7 @@ func TestNRGCatchupDontCountTowardQuorum(t *testing.T) {
n.processAppendEntry(aeHeartbeat, n.aesub)
msg, err = sub.NextMsg(time.Second)
require_NoError(t, err)
ar = n.decodeAppendEntryResponse(msg.Data)
ar = decodeAppendEntryResponse(msg.Data)
require_Equal(t, ar.index, aeHeartbeat.pindex)
require_True(t, ar.success)
require_Equal(t, msg.Reply, _EMPTY_)
Expand Down Expand Up @@ -3196,7 +3194,7 @@ func TestNRGDelayedMessagesAfterCatchupDontCountTowardQuorum(t *testing.T) {
// Should reply "success", this is the latest message.
msg, err := sub.NextMsg(500 * time.Millisecond)
require_NoError(t, err)
ar := n.decodeAppendEntryResponse(msg.Data)
ar := decodeAppendEntryResponse(msg.Data)
require_Equal(t, ar.index, 3)
require_True(t, ar.success)
require_Equal(t, msg.Reply, _EMPTY_)
Expand Down Expand Up @@ -3359,14 +3357,14 @@ func TestNRGLeaderCatchupHandling(t *testing.T) {
// Should receive all messages the leader knows up to this point.
msg, err := sub.NextMsg(500 * time.Millisecond)
require_NoError(t, err)
ae, err := n.decodeAppendEntry(msg.Data, nil, _EMPTY_)
ae, err := decodeAppendEntry(msg.Data, nil, _EMPTY_)
require_NoError(t, err)
require_Equal(t, ae.pterm, 1)
require_Equal(t, ae.pindex, 1)

msg, err = sub.NextMsg(500 * time.Millisecond)
require_NoError(t, err)
ae, err = n.decodeAppendEntry(msg.Data, nil, _EMPTY_)
ae, err = decodeAppendEntry(msg.Data, nil, _EMPTY_)
require_NoError(t, err)
require_Equal(t, ae.pterm, 1)
require_Equal(t, ae.pindex, 2)
Expand Down Expand Up @@ -3425,7 +3423,7 @@ func TestNRGNewEntriesFromOldLeaderResetsWALDuringCatchup(t *testing.T) {
// Should reply we have a higher term, prompting the server to step down.
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
ar := n.decodeAppendEntryResponse(msg.Data)
ar := decodeAppendEntryResponse(msg.Data)
require_False(t, ar.success)
require_Equal(t, ar.index, 1)
require_Equal(t, ar.term, 20)
Expand Down