Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3208,12 +3208,14 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Returns the PeerInfo for all replicas of a raft node. This is different than node.Peers()
// and is used for external facing advisories.
func (s *Server) replicas(node RaftNode) []*PeerInfo {
now := time.Now()
var replicas []*PeerInfo
for _, rp := range node.Peers() {
if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil {
si := sir.(nodeInfo)
pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag}
pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Offline: si.offline, Lag: rp.Lag}
if !rp.Last.IsZero() {
pi.Active = time.Since(rp.Last)
}
replicas = append(replicas, pi)
}
}
Expand Down Expand Up @@ -8874,7 +8876,7 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
for _, rp := range peers {
if rp.ID != id && rg.isMember(rp.ID) {
var lastSeen time.Duration
if now.After(rp.Last) && rp.Last.Unix() != 0 {
if now.After(rp.Last) && !rp.Last.IsZero() {
lastSeen = now.Sub(rp.Last)
}
current := rp.Current
Expand Down
4 changes: 2 additions & 2 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4027,8 +4027,8 @@ func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus {
Known: p.kp,
LastReplicatedIndex: p.li,
}
if p.ts > 0 {
peer.LastSeen = time.Since(time.Unix(0, p.ts)).String()
if !p.ts.IsZero() {
peer.LastSeen = time.Since(p.ts).String()
}
info.Peers[id] = peer
}
Expand Down
53 changes: 24 additions & 29 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ type catchupState struct {

// lps holds peer state of last time and last index replicated.
type lps struct {
ts int64 // Last timestamp
li uint64 // Last index replicated
kp bool // Known peer
ts time.Time // Last timestamp
li uint64 // Last index replicated
kp bool // Known peer
}

const (
Expand Down Expand Up @@ -502,13 +502,13 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
}

// Make sure to track ourselves.
n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true}
n.peers[n.id] = &lps{time.Now(), 0, true}

// Track known peers
for _, peer := range ps.knownPeers {
if peer != n.id {
// Set these to 0 to start but mark as known peer.
n.peers[peer] = &lps{0, 0, true}
n.peers[peer] = &lps{time.Time{}, 0, true}
}
}

Expand Down Expand Up @@ -1446,9 +1446,8 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool {

// Check to see that we have heard from the current leader lately.
if n.leader != noLeader && n.leader != n.id && n.catchup == nil {
okInterval := int64(hbInterval) * 2
ts := time.Now().UnixNano()
if ps := n.peers[n.leader]; ps == nil || ps.ts == 0 && (ts-ps.ts) > okInterval {
okInterval := hbInterval * 2
if ps := n.peers[n.leader]; ps == nil || time.Since(ps.ts) > okInterval {
n.debug("Not current, no recent leader contact")
return false
}
Expand Down Expand Up @@ -1586,14 +1585,12 @@ func (n *raft) StepDown(preferred ...string) error {
preferred = nil
}

nowts := time.Now().UnixNano()

// If we have a preferred check it first.
if maybeLeader != noLeader {
var isHealthy bool
if ps, ok := n.peers[maybeLeader]; ok {
si, ok := n.s.nodeToInfo.Load(maybeLeader)
isHealthy = ok && !si.(nodeInfo).offline && (nowts-ps.ts) < int64(hbInterval*3)
isHealthy = ok && !si.(nodeInfo).offline && time.Since(ps.ts) < hbInterval*3
}
if !isHealthy {
maybeLeader = noLeader
Expand All @@ -1608,7 +1605,7 @@ func (n *raft) StepDown(preferred ...string) error {
continue
}
si, ok := n.s.nodeToInfo.Load(peer)
isHealthy := ok && !si.(nodeInfo).offline && (nowts-ps.ts) < int64(hbInterval*3)
isHealthy := ok && !si.(nodeInfo).offline && time.Since(ps.ts) < hbInterval*3
if isHealthy {
maybeLeader = peer
break
Expand Down Expand Up @@ -1733,7 +1730,7 @@ func (n *raft) Peers() []*Peer {
p := &Peer{
ID: id,
Current: id == n.leader || ps.li >= n.applied,
Last: time.Unix(0, ps.ts),
Last: ps.ts,
Lag: lag,
}
peers = append(peers, p)
Expand Down Expand Up @@ -2633,11 +2630,10 @@ func (n *raft) Quorum() bool {
n.RLock()
defer n.RUnlock()

now, nc := time.Now().UnixNano(), 0
nc := 0
for id, peer := range n.peers {
if id == n.id || time.Duration(now-peer.ts) < lostQuorumInterval {
nc++
if nc >= n.qn {
if id == n.id || time.Since(peer.ts) < lostQuorumInterval {
if nc++; nc >= n.qn {
return true
}
}
Expand All @@ -2659,11 +2655,10 @@ func (n *raft) lostQuorumLocked() bool {
return false
}

now, nc := time.Now().UnixNano(), 0
nc := 0
for id, peer := range n.peers {
if id == n.id || time.Duration(now-peer.ts) < lostQuorumInterval {
nc++
if nc >= n.qn {
if id == n.id || time.Since(peer.ts) < lostQuorumInterval {
if nc++; nc >= n.qn {
return false
}
}
Expand Down Expand Up @@ -2979,7 +2974,7 @@ func (n *raft) applyCommit(index uint64) error {

if lp, ok := n.peers[newPeer]; !ok {
// We are not tracking this one automatically so we need to bump cluster size.
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, true}
n.peers[newPeer] = &lps{time.Now(), 0, true}
} else {
// Mark as added.
lp.kp = true
Expand Down Expand Up @@ -3135,9 +3130,9 @@ func (n *raft) trackPeer(peer string) error {
}
}
if ps := n.peers[peer]; ps != nil {
ps.ts = time.Now().UnixNano()
ps.ts = time.Now()
} else if !isRemoved {
n.peers[peer] = &lps{time.Now().UnixNano(), 0, false}
n.peers[peer] = &lps{time.Now(), 0, false}
}
n.Unlock()

Expand Down Expand Up @@ -3432,9 +3427,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Track leader directly
if isNew && ae.leader != noLeader {
if ps := n.peers[ae.leader]; ps != nil {
ps.ts = time.Now().UnixNano()
ps.ts = time.Now()
} else {
n.peers[ae.leader] = &lps{time.Now().UnixNano(), 0, true}
n.peers[ae.leader] = &lps{time.Now(), 0, true}
}
}

Expand Down Expand Up @@ -3695,9 +3690,9 @@ CONTINUE:
if newPeer := string(e.Data); len(newPeer) == idLen {
// Track directly, but wait for commit to be official
if ps := n.peers[newPeer]; ps != nil {
ps.ts = time.Now().UnixNano()
ps.ts = time.Now()
} else {
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false}
n.peers[newPeer] = &lps{time.Now(), 0, false}
}
// Store our peer in our global peer map for all peers.
peers.LoadOrStore(newPeer, newPeer)
Expand Down Expand Up @@ -3754,7 +3749,7 @@ func (n *raft) processPeerState(ps *peerState) {
lp.kp = true
n.peers[peer] = lp
} else {
n.peers[peer] = &lps{0, 0, true}
n.peers[peer] = &lps{time.Time{}, 0, true}
}
}
n.debug("Update peers from leader to %+v", n.peers)
Expand Down