diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1eabc7dfcb6..8753f363fd4 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5590,6 +5590,13 @@ func (js *jetStream) processLeaderChange(isLeader bool) { js.mu.Lock() defer js.mu.Unlock() + if isLeader { + if meta := js.cluster.meta; meta != nil && meta.IsObserver() { + meta.StepDown() + return + } + } + if isLeader { js.startUpdatesSub() } else { diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 698bb5916d5..774e9dc70f8 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -5256,3 +5256,68 @@ func TestJetStreamClusterServerPeerRemovePeersDrift(t *testing.T) { return nil }) } + +func TestJetStreamClusterObserverNotElectedMetaLeader(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + c.waitOnLeader() + + getMeta := func(s *Server) *raft { + if js := s.getJetStream(); js != nil { + if mg := js.getMetaGroup(); mg != nil { + return mg.(*raft) + } + } + return nil + } + + setToObserverAndStepDown := func(s *Server) { + if meta := getMeta(s); meta != nil { + meta.setObserver(true, extExtended) + meta.StepDown() + } + } + + var wg sync.WaitGroup + + for range 10 { + // Pick what will be the new leader since we are going to switch + // the 2 other servers to observer mode and make them step down. + newLeader := c.randomNonLeader() + leader := c.leader() + + var other *Server + for _, s := range c.servers { + if s != newLeader && s != leader { + other = s + break + } + } + + wg.Add(1) + go func() { + defer wg.Done() + // Add some random delay before changing state and stepping down. + time.Sleep(time.Duration(rand.Intn(25)) * time.Millisecond) + setToObserverAndStepDown(other) + }() + setToObserverAndStepDown(leader) + wg.Wait() + + // Wait for the newLeader to really be elected. + checkFor(t, 10*time.Second, 50*time.Millisecond, func() error { + if !newLeader.JetStreamIsLeader() { + return fmt.Errorf("Server %q is still not leader", newLeader) + } + return nil + }) + + // Change the observer back to false. + for _, s := range []*Server{leader, other} { + if meta := getMeta(s); meta != nil { + meta.SetObserver(false) + } + } + } +}