Skip to content

Commit

Permalink
Add in utility to detect and delete any NRG orphans.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored and wallyqs committed Oct 10, 2023
1 parent 50722e9 commit 0ac7895
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 2 deletions.
73 changes: 72 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,65 @@ func (js *jetStream) checkForOrphans() {
}
}

// Check and delete any orphans we may come across.
func (s *Server) checkForNRGOrphans() {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || js.isMetaRecovering() {
// No cluster means no NRGs. Also return if still recovering.
return
}

// Track which assets R>1 should be on this server.
nrgMap := make(map[string]struct{})
trackGroup := func(rg *raftGroup) {
// If R>1 track this as a legit NRG.
if rg.node != nil {
nrgMap[rg.Name] = struct{}{}
}
}
// Register our meta.
js.mu.RLock()
meta := cc.meta
if meta == nil {
js.mu.RUnlock()
// Bail with no meta node.
return
}

ourID := meta.ID()
nrgMap[meta.Group()] = struct{}{}

// Collect all valid groups from our assignments.
for _, asa := range cc.streams {
for _, sa := range asa {
if sa.Group.isMember(ourID) && sa.Restore == nil {
trackGroup(sa.Group)
for _, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
trackGroup(ca.Group)
}
}
}
}
}
js.mu.RUnlock()

// Check NRGs that are running.
var needDelete []RaftNode
s.rnMu.RLock()
for name, n := range s.raftNodes {
if _, ok := nrgMap[name]; !ok {
needDelete = append(needDelete, n)
}
}
s.rnMu.RUnlock()

for _, n := range needDelete {
s.Warnf("Detected orphaned NRG %q, will cleanup", n.Group())
n.Delete()
}
}

func (js *jetStream) monitorCluster() {
s, n := js.server(), js.getMetaGroup()
qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()
Expand Down Expand Up @@ -1189,6 +1248,8 @@ func (js *jetStream) monitorCluster() {
if hs := s.healthz(nil); hs.Error != _EMPTY_ {
s.Warnf("%v", hs.Error)
}
// Also check for orphaned NRGs.
s.checkForNRGOrphans()
}

var (
Expand Down Expand Up @@ -1269,7 +1330,6 @@ func (js *jetStream) monitorCluster() {
go checkHealth()
continue
}
// FIXME(dlc) - Deal with errors.
if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) {
Expand All @@ -1280,6 +1340,8 @@ func (js *jetStream) monitorCluster() {
doSnapshot()
}
ce.ReturnToPool()
} else {
s.Warnf("Error applying JetStream cluster entries: %v", err)
}
}
aq.recycle(&ces)
Expand Down Expand Up @@ -2028,6 +2090,15 @@ func (mset *stream) removeNode() {
}
}

func (mset *stream) clearRaftNode() {
if mset == nil {
return
}
mset.mu.Lock()
defer mset.mu.Unlock()
mset.node = nil
}

// Helper function to generate peer info.
// lists and sets for old and new.
func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPeerSet, oldPeerSet map[string]bool) {
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5306,7 +5306,7 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Nowmal Stream
// Normal Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Expand Down

0 comments on commit 0ac7895

Please sign in to comment.