diff --git a/server/norace_1_test.go b/server/norace_1_test.go index 75b6b70cee2..0afe648df0a 100644 --- a/server/norace_1_test.go +++ b/server/norace_1_test.go @@ -3606,7 +3606,7 @@ func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - ae, err := node.decodeAppendEntry(sm.msg, nil, _EMPTY_) + ae, err := decodeAppendEntry(sm.msg, nil, _EMPTY_) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/server/raft.go b/server/raft.go index 0191827e8e8..04ec1b4cadc 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2424,7 +2424,7 @@ func (ae *appendEntry) encode(b []byte) ([]byte, error) { } // This can not be used post the wire level callback since we do not copy. -func (n *raft) decodeAppendEntry(msg []byte, sub *subscription, reply string) (*appendEntry, error) { +func decodeAppendEntry(msg []byte, sub *subscription, reply string) (*appendEntry, error) { if len(msg) < appendEntryBaseLen { return nil, errBadAppendEntry } @@ -2509,7 +2509,7 @@ func (ar *appendEntryResponse) encode(b []byte) []byte { // Track all peers we may have ever seen to use an string interns for appendEntryResponse decoding. var peers sync.Map -func (n *raft) decodeAppendEntryResponse(msg []byte) *appendEntryResponse { +func decodeAppendEntryResponse(msg []byte) *appendEntryResponse { if len(msg) != appendEntryResponseLen { return nil } @@ -2972,7 +2972,7 @@ func (n *raft) loadEntry(index uint64) (*appendEntry, error) { if err != nil { return nil, err } - return n.decodeAppendEntry(sm.msg, nil, _EMPTY_) + return decodeAppendEntry(sm.msg, nil, _EMPTY_) } // applyCommit will update our commit index and apply the entry to the apply queue. @@ -3302,7 +3302,7 @@ func (n *raft) runAsCandidate() { // is an internal callback from the "asubj" append entry subscription. func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, _, reply string, msg []byte) { msg = copyBytes(msg) - if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil { + if ae, err := decodeAppendEntry(msg, sub, reply); err == nil { // Push to the new entry channel. From here one of the worker // goroutines (runAsLeader, runAsFollower, runAsCandidate) will // pick it up. @@ -3894,7 +3894,7 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { // handleAppendEntryResponse processes responses to append entries. func (n *raft) handleAppendEntryResponse(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { - ar := n.decodeAppendEntryResponse(msg) + ar := decodeAppendEntryResponse(msg) ar.reply = reply n.resp.push(ar) } diff --git a/server/raft_test.go b/server/raft_test.go index 4d97367dd29..3fdb3626d30 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -133,9 +133,8 @@ func TestNRGAppendEntryDecode(t *testing.T) { require_NoError(t, err) // Truncate buffer first. - var node *raft short := buf[0 : len(buf)-1025] - _, err = node.decodeAppendEntry(short, nil, _EMPTY_) + _, err = decodeAppendEntry(short, nil, _EMPTY_) require_Error(t, err, errBadAppendEntry) for i := 0; i < 100; i++ { @@ -144,7 +143,7 @@ func TestNRGAppendEntryDecode(t *testing.T) { bi := 42 + rand.Intn(len(b)-42) if b[bi] != 0 && bi != 40 { b[bi] = 0 - _, err = node.decodeAppendEntry(b, nil, _EMPTY_) + _, err = decodeAppendEntry(b, nil, _EMPTY_) require_Error(t, err, errBadAppendEntry) } } @@ -279,7 +278,7 @@ func TestNRGAEFromOldLeader(t *testing.T) { require_NoError(t, err) // Wait for the response, the server should have rejected it. - ar := leader.decodeAppendEntryResponse(resp.Data) + ar := decodeAppendEntryResponse(resp.Data) require_NotNil(t, ar) require_Equal(t, ar.success, false) @@ -400,8 +399,7 @@ func TestNRGLeaderTransfer(t *testing.T) { return err } - leader := newLeader.node().(*raft) - ae, err := leader.decodeAppendEntry(msg.Data, nil, msg.Reply) + ae, err := decodeAppendEntry(msg.Data, nil, msg.Reply) if err != nil { return err } @@ -473,7 +471,7 @@ func TestNRGStepDownOnSameTermDoesntClearVote(t *testing.T) { // We're going to modify the append entry that we received so that // we can send it again with modifications. - ae, err := leader.decodeAppendEntry(msg.Data, nil, msg.Reply) + ae, err := decodeAppendEntry(msg.Data, nil, msg.Reply) require_NoError(t, err) // First of all we're going to try sending an append entry that @@ -931,7 +929,7 @@ func TestNRGNoResetOnAppendEntryResponse(t *testing.T) { // The higher term in this case is what would cause the leader previously // to reset the entire log which it shouldn't do. _, err := nc.Subscribe(fmt.Sprintf(raftAppendSubj, "TEST"), func(msg *nats.Msg) { - if ae, err := follower.decodeAppendEntry(msg.Data, nil, msg.Reply); err == nil && len(ae.entries) > 0 { + if ae, err := decodeAppendEntry(msg.Data, nil, msg.Reply); err == nil && len(ae.entries) > 0 { ar := newAppendEntryResponse(ae.term+1, ae.commit, follower.id, false) require_NoError(t, msg.Respond(ar.encode(nil))) } @@ -1023,7 +1021,7 @@ func TestNRGCandidateDontStepdownDueToLeaderOfPreviousTerm(t *testing.T) { defer rg.unlockAll() // Decode the append entry - ae, err := leader.decodeAppendEntry(msg.Data, nil, msg.Reply) + ae, err := decodeAppendEntry(msg.Data, nil, msg.Reply) require_NoError(t, err) // Check that the append entry is from the leader @@ -2434,7 +2432,7 @@ func TestNRGCatchupDontCountTowardQuorum(t *testing.T) { // Should reply we require catchup. msg, err := sub.NextMsg(time.Second) require_NoError(t, err) - ar := n.decodeAppendEntryResponse(msg.Data) + ar := decodeAppendEntryResponse(msg.Data) require_Equal(t, ar.index, 0) require_False(t, ar.success) require_True(t, strings.HasPrefix(msg.Reply, "$NRG.CR")) @@ -2452,7 +2450,7 @@ func TestNRGCatchupDontCountTowardQuorum(t *testing.T) { n.processAppendEntry(aeHeartbeat, n.aesub) msg, err = sub.NextMsg(time.Second) require_NoError(t, err) - ar = n.decodeAppendEntryResponse(msg.Data) + ar = decodeAppendEntryResponse(msg.Data) require_Equal(t, ar.index, aeHeartbeat.pindex) require_True(t, ar.success) require_Equal(t, msg.Reply, _EMPTY_) @@ -3196,7 +3194,7 @@ func TestNRGDelayedMessagesAfterCatchupDontCountTowardQuorum(t *testing.T) { // Should reply "success", this is the latest message. msg, err := sub.NextMsg(500 * time.Millisecond) require_NoError(t, err) - ar := n.decodeAppendEntryResponse(msg.Data) + ar := decodeAppendEntryResponse(msg.Data) require_Equal(t, ar.index, 3) require_True(t, ar.success) require_Equal(t, msg.Reply, _EMPTY_) @@ -3359,14 +3357,14 @@ func TestNRGLeaderCatchupHandling(t *testing.T) { // Should receive all messages the leader knows up to this point. msg, err := sub.NextMsg(500 * time.Millisecond) require_NoError(t, err) - ae, err := n.decodeAppendEntry(msg.Data, nil, _EMPTY_) + ae, err := decodeAppendEntry(msg.Data, nil, _EMPTY_) require_NoError(t, err) require_Equal(t, ae.pterm, 1) require_Equal(t, ae.pindex, 1) msg, err = sub.NextMsg(500 * time.Millisecond) require_NoError(t, err) - ae, err = n.decodeAppendEntry(msg.Data, nil, _EMPTY_) + ae, err = decodeAppendEntry(msg.Data, nil, _EMPTY_) require_NoError(t, err) require_Equal(t, ae.pterm, 1) require_Equal(t, ae.pindex, 2) @@ -3425,7 +3423,7 @@ func TestNRGNewEntriesFromOldLeaderResetsWALDuringCatchup(t *testing.T) { // Should reply we have a higher term, prompting the server to step down. msg, err := sub.NextMsg(time.Second) require_NoError(t, err) - ar := n.decodeAppendEntryResponse(msg.Data) + ar := decodeAppendEntryResponse(msg.Data) require_False(t, ar.success) require_Equal(t, ar.index, 1) require_Equal(t, ar.term, 20)