From 2e2d40c6d0da7e84144724a4b86ce0f074ee8783 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 23 Jun 2025 11:20:20 +0100 Subject: [PATCH] NRG: Monotonic heartbeat and quorum tracking Converting the timestamps to `UnixNano` meant that all of these operations were using wall-clock rather than monotonic time, which meant that these operations were not safe against wall-clock drifts or NTP adjustments. This could result in unexpected loss of quorum. Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 8 +++--- server/monitor.go | 4 +-- server/raft.go | 53 +++++++++++++++++-------------------- 3 files changed, 31 insertions(+), 34 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 07b93e3a556..7438c707b11 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3208,12 +3208,14 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // Returns the PeerInfo for all replicas of a raft node. This is different than node.Peers() // and is used for external facing advisories. func (s *Server) replicas(node RaftNode) []*PeerInfo { - now := time.Now() var replicas []*PeerInfo for _, rp := range node.Peers() { if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil { si := sir.(nodeInfo) - pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag} + pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Offline: si.offline, Lag: rp.Lag} + if !rp.Last.IsZero() { + pi.Active = time.Since(rp.Last) + } replicas = append(replicas, pi) } } @@ -8874,7 +8876,7 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { for _, rp := range peers { if rp.ID != id && rg.isMember(rp.ID) { var lastSeen time.Duration - if now.After(rp.Last) && rp.Last.Unix() != 0 { + if now.After(rp.Last) && !rp.Last.IsZero() { lastSeen = now.Sub(rp.Last) } current := rp.Current diff --git a/server/monitor.go b/server/monitor.go index cf0bbecf5eb..e2a76b90f6a 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -4027,8 +4027,8 @@ func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus { Known: p.kp, LastReplicatedIndex: p.li, } - if p.ts > 0 { - peer.LastSeen = time.Since(time.Unix(0, p.ts)).String() + if !p.ts.IsZero() { + peer.LastSeen = time.Since(p.ts).String() } info.Peers[id] = peer } diff --git a/server/raft.go b/server/raft.go index 1e21eca47eb..084c9456347 100644 --- a/server/raft.go +++ b/server/raft.go @@ -240,9 +240,9 @@ type catchupState struct { // lps holds peer state of last time and last index replicated. type lps struct { - ts int64 // Last timestamp - li uint64 // Last index replicated - kp bool // Known peer + ts time.Time // Last timestamp + li uint64 // Last index replicated + kp bool // Known peer } const ( @@ -502,13 +502,13 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel } // Make sure to track ourselves. - n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true} + n.peers[n.id] = &lps{time.Now(), 0, true} // Track known peers for _, peer := range ps.knownPeers { if peer != n.id { // Set these to 0 to start but mark as known peer. - n.peers[peer] = &lps{0, 0, true} + n.peers[peer] = &lps{time.Time{}, 0, true} } } @@ -1446,9 +1446,8 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { // Check to see that we have heard from the current leader lately. if n.leader != noLeader && n.leader != n.id && n.catchup == nil { - okInterval := int64(hbInterval) * 2 - ts := time.Now().UnixNano() - if ps := n.peers[n.leader]; ps == nil || ps.ts == 0 && (ts-ps.ts) > okInterval { + okInterval := hbInterval * 2 + if ps := n.peers[n.leader]; ps == nil || time.Since(ps.ts) > okInterval { n.debug("Not current, no recent leader contact") return false } @@ -1586,14 +1585,12 @@ func (n *raft) StepDown(preferred ...string) error { preferred = nil } - nowts := time.Now().UnixNano() - // If we have a preferred check it first. if maybeLeader != noLeader { var isHealthy bool if ps, ok := n.peers[maybeLeader]; ok { si, ok := n.s.nodeToInfo.Load(maybeLeader) - isHealthy = ok && !si.(nodeInfo).offline && (nowts-ps.ts) < int64(hbInterval*3) + isHealthy = ok && !si.(nodeInfo).offline && time.Since(ps.ts) < hbInterval*3 } if !isHealthy { maybeLeader = noLeader @@ -1608,7 +1605,7 @@ func (n *raft) StepDown(preferred ...string) error { continue } si, ok := n.s.nodeToInfo.Load(peer) - isHealthy := ok && !si.(nodeInfo).offline && (nowts-ps.ts) < int64(hbInterval*3) + isHealthy := ok && !si.(nodeInfo).offline && time.Since(ps.ts) < hbInterval*3 if isHealthy { maybeLeader = peer break @@ -1733,7 +1730,7 @@ func (n *raft) Peers() []*Peer { p := &Peer{ ID: id, Current: id == n.leader || ps.li >= n.applied, - Last: time.Unix(0, ps.ts), + Last: ps.ts, Lag: lag, } peers = append(peers, p) @@ -2633,11 +2630,10 @@ func (n *raft) Quorum() bool { n.RLock() defer n.RUnlock() - now, nc := time.Now().UnixNano(), 0 + nc := 0 for id, peer := range n.peers { - if id == n.id || time.Duration(now-peer.ts) < lostQuorumInterval { - nc++ - if nc >= n.qn { + if id == n.id || time.Since(peer.ts) < lostQuorumInterval { + if nc++; nc >= n.qn { return true } } @@ -2659,11 +2655,10 @@ func (n *raft) lostQuorumLocked() bool { return false } - now, nc := time.Now().UnixNano(), 0 + nc := 0 for id, peer := range n.peers { - if id == n.id || time.Duration(now-peer.ts) < lostQuorumInterval { - nc++ - if nc >= n.qn { + if id == n.id || time.Since(peer.ts) < lostQuorumInterval { + if nc++; nc >= n.qn { return false } } @@ -2979,7 +2974,7 @@ func (n *raft) applyCommit(index uint64) error { if lp, ok := n.peers[newPeer]; !ok { // We are not tracking this one automatically so we need to bump cluster size. - n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, true} + n.peers[newPeer] = &lps{time.Now(), 0, true} } else { // Mark as added. lp.kp = true @@ -3135,9 +3130,9 @@ func (n *raft) trackPeer(peer string) error { } } if ps := n.peers[peer]; ps != nil { - ps.ts = time.Now().UnixNano() + ps.ts = time.Now() } else if !isRemoved { - n.peers[peer] = &lps{time.Now().UnixNano(), 0, false} + n.peers[peer] = &lps{time.Now(), 0, false} } n.Unlock() @@ -3432,9 +3427,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // Track leader directly if isNew && ae.leader != noLeader { if ps := n.peers[ae.leader]; ps != nil { - ps.ts = time.Now().UnixNano() + ps.ts = time.Now() } else { - n.peers[ae.leader] = &lps{time.Now().UnixNano(), 0, true} + n.peers[ae.leader] = &lps{time.Now(), 0, true} } } @@ -3695,9 +3690,9 @@ CONTINUE: if newPeer := string(e.Data); len(newPeer) == idLen { // Track directly, but wait for commit to be official if ps := n.peers[newPeer]; ps != nil { - ps.ts = time.Now().UnixNano() + ps.ts = time.Now() } else { - n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false} + n.peers[newPeer] = &lps{time.Now(), 0, false} } // Store our peer in our global peer map for all peers. peers.LoadOrStore(newPeer, newPeer) @@ -3754,7 +3749,7 @@ func (n *raft) processPeerState(ps *peerState) { lp.kp = true n.peers[peer] = lp } else { - n.peers[peer] = &lps{0, 0, true} + n.peers[peer] = &lps{time.Time{}, 0, true} } } n.debug("Update peers from leader to %+v", n.peers)