diff --git a/server/jetstream_leafnode_test.go b/server/jetstream_leafnode_test.go index bb2337d7f47..f0fa78881f7 100644 --- a/server/jetstream_leafnode_test.go +++ b/server/jetstream_leafnode_test.go @@ -1242,3 +1242,85 @@ func TestJetStreamLeafNodeSvcImportExportCycle(t *testing.T) { _, err = js.Publish("foo", []byte("msg")) require_NoError(t, err) } + +func TestJetStreamLeafNodeJSClusterMigrateRecovery(t *testing.T) { + tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: hub, store_dir:", 1) + c := createJetStreamCluster(t, tmpl, "hub", _EMPTY_, 3, 12232, true) + defer c.shutdown() + + tmpl = strings.Replace(jsClusterTemplWithLeafNode, "store_dir:", "domain: leaf, store_dir:", 1) + lnc := c.createLeafNodesWithTemplateAndStartPort(tmpl, "leaf", 3, 23913) + defer lnc.shutdown() + + lnc.waitOnClusterReady() + for _, s := range lnc.servers { + s.setJetStreamMigrateOnRemoteLeaf() + } + + nc, _ := jsClientConnect(t, lnc.randomServer()) + defer nc.Close() + + ljs, err := nc.JetStream(nats.Domain("leaf")) + require_NoError(t, err) + + // Create an asset in the leafnode cluster. + si, err := ljs.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + require_Equal(t, si.Cluster.Name, "leaf") + require_NotEqual(t, si.Cluster.Leader, noLeader) + require_Equal(t, len(si.Cluster.Replicas), 2) + + // Count how many remotes each server in the leafnode cluster is + // supposed to have and then take them down. + remotes := map[*Server]int{} + for _, s := range lnc.servers { + remotes[s] += len(s.leafRemoteCfgs) + s.closeAndDisableLeafnodes() + checkLeafNodeConnectedCount(t, s, 0) + } + + // The Raft nodes in the leafnode cluster now need some time to + // notice that they're no longer receiving AEs from a leader, as + // they should have been forced into observer mode. Check that + // this is the case. + time.Sleep(maxElectionTimeout) + for _, s := range lnc.servers { + s.rnMu.RLock() + for name, n := range s.raftNodes { + // We don't expect the metagroup to have turned into an + // observer but all other assets should have done. + if name == defaultMetaGroupName { + require_False(t, n.IsObserver()) + } else { + require_True(t, n.IsObserver()) + } + } + s.rnMu.RUnlock() + } + + // Bring the leafnode connections back up. + for _, s := range lnc.servers { + s.reEnableLeafnodes() + checkLeafNodeConnectedCount(t, s, remotes[s]) + } + + // Wait for nodes to notice they are no longer in observer mode + // and to leave observer mode. + time.Sleep(maxElectionTimeout) + for _, s := range lnc.servers { + s.rnMu.RLock() + for _, n := range s.raftNodes { + require_False(t, n.IsObserver()) + } + s.rnMu.RUnlock() + } + + // Previously nodes would have left observer mode but then would + // have failed to elect a stream leader as they were stuck on a + // long election timer. Now this should work reliably. + lnc.waitOnStreamLeader(globalAccountName, "TEST") +} diff --git a/server/raft.go b/server/raft.go index 470ee9da824..c748470260d 100644 --- a/server/raft.go +++ b/server/raft.go @@ -235,16 +235,18 @@ const ( hbIntervalDefault = 1 * time.Second lostQuorumIntervalDefault = hbIntervalDefault * 10 // 10 seconds lostQuorumCheckIntervalDefault = hbIntervalDefault * 10 // 10 seconds + observerModeIntervalDefault = 48 * time.Hour ) var ( - minElectionTimeout = minElectionTimeoutDefault - maxElectionTimeout = maxElectionTimeoutDefault - minCampaignTimeout = minCampaignTimeoutDefault - maxCampaignTimeout = maxCampaignTimeoutDefault - hbInterval = hbIntervalDefault - lostQuorumInterval = lostQuorumIntervalDefault - lostQuorumCheck = lostQuorumCheckIntervalDefault + minElectionTimeout = minElectionTimeoutDefault + maxElectionTimeout = maxElectionTimeoutDefault + minCampaignTimeout = minCampaignTimeoutDefault + maxCampaignTimeout = maxCampaignTimeoutDefault + hbInterval = hbIntervalDefault + lostQuorumInterval = lostQuorumIntervalDefault + lostQuorumCheck = lostQuorumCheckIntervalDefault + observerModeInterval = observerModeIntervalDefault ) type RaftConfig struct { @@ -875,7 +877,7 @@ func (n *raft) PauseApply() error { n.hcommit = n.commit // Also prevent us from trying to become a leader while paused and catching up. n.pobserver, n.observer = n.observer, true - n.resetElect(48 * time.Hour) + n.resetElect(observerModeInterval) return nil } @@ -1895,8 +1897,16 @@ func (n *raft) SetObserver(isObserver bool) { func (n *raft) setObserver(isObserver bool, extSt extensionState) { n.Lock() defer n.Unlock() + + wasObserver := n.observer n.observer = isObserver n.extSt = extSt + + // If we're leaving observer state then reset the election timer or + // we might end up waiting for up to the observerModeInterval. + if wasObserver && !isObserver { + n.resetElect(randCampaignTimeout()) + } } // processAppendEntries is called by the Raft state machine when there are @@ -1946,7 +1956,7 @@ func (n *raft) runAsFollower() { n.resetElectionTimeoutWithLock() n.debug("Not switching to candidate, no resources") } else if n.IsObserver() { - n.resetElectWithLock(48 * time.Hour) + n.resetElectWithLock(observerModeInterval) n.debug("Not switching to candidate, observer only") } else if n.isCatchingUp() { n.debug("Not switching to candidate, catching up") diff --git a/server/raft_test.go b/server/raft_test.go index 06639abe16b..5e5ab4b630c 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -489,3 +489,32 @@ func TestNRGHeartbeatOnLeaderChange(t *testing.T) { rg.waitOnLeader() } } + +func TestNRGElectionTimerAfterObserver(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + for _, n := range rg { + n.node().SetObserver(true) + } + + time.Sleep(maxElectionTimeout) + before := time.Now() + + for _, n := range rg { + n.node().SetObserver(false) + } + + time.Sleep(maxCampaignTimeout) + + for _, n := range rg { + rn := n.node().(*raft) + rn.RLock() + etlr := rn.etlr + rn.RUnlock() + require_True(t, etlr.After(before)) + } +}