Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5590,6 +5590,13 @@ func (js *jetStream) processLeaderChange(isLeader bool) {
js.mu.Lock()
defer js.mu.Unlock()

if isLeader {
if meta := js.cluster.meta; meta != nil && meta.IsObserver() {
meta.StepDown()
return
}
}

if isLeader {
js.startUpdatesSub()
} else {
Expand Down
65 changes: 65 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5256,3 +5256,68 @@ func TestJetStreamClusterServerPeerRemovePeersDrift(t *testing.T) {
return nil
})
}

func TestJetStreamClusterObserverNotElectedMetaLeader(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

c.waitOnLeader()

getMeta := func(s *Server) *raft {
if js := s.getJetStream(); js != nil {
if mg := js.getMetaGroup(); mg != nil {
return mg.(*raft)
}
}
return nil
}

setToObserverAndStepDown := func(s *Server) {
if meta := getMeta(s); meta != nil {
meta.setObserver(true, extExtended)
meta.StepDown()
}
}

var wg sync.WaitGroup

for range 10 {
// Pick what will be the new leader since we are going to switch
// the 2 other servers to observer mode and make them step down.
newLeader := c.randomNonLeader()
leader := c.leader()

var other *Server
for _, s := range c.servers {
if s != newLeader && s != leader {
other = s
break
}
}

wg.Add(1)
go func() {
defer wg.Done()
// Add some random delay before changing state and stepping down.
time.Sleep(time.Duration(rand.Intn(25)) * time.Millisecond)
setToObserverAndStepDown(other)
}()
setToObserverAndStepDown(leader)
wg.Wait()

// Wait for the newLeader to really be elected.
checkFor(t, 10*time.Second, 50*time.Millisecond, func() error {
if !newLeader.JetStreamIsLeader() {
return fmt.Errorf("Server %q is still not leader", newLeader)
}
return nil
})

// Change the observer back to false.
for _, s := range []*Server{leader, other} {
if meta := getMeta(s); meta != nil {
meta.SetObserver(false)
}
}
}
}
Loading