diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index b3538d22a65..0807c1d17ab 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2037,7 +2037,24 @@ retry: samePeers = slices.Equal(groupPeerIDs, nodePeerIDs) } if !samePeers { + // At this point we have no way of knowing: + // 1. Whether the group has lost enough nodes to cause a quorum + // loss, in which case a proposal may fail, therefore we will + // force a peerstate write; + // 2. Whether nodes in the group have other applies queued up + // that could change the peerstate again, therefore the leader + // should send out a new proposal anyway too just to make sure + // that this change gets captured in the log. node.UpdateKnownPeers(groupPeerIDs) + + // If the peers changed as a result of an update by the meta layer, we must reflect that in the log of + // this group. Otherwise, a new peer would come up and instantly reset the peer state back to whatever is + // in the log at that time, overwriting what the meta layer told it. + // Will need to address this properly later on, by for example having the meta layer decide the new + // placement, but have the leader of this group propose it through its own log instead. + if node.Leader() { + node.ProposeKnownPeers(groupPeerIDs) + } } rg.node = node js.mu.Unlock() diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index fa3e27f3ea5..685c94db20a 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -5088,3 +5088,109 @@ func TestJetStreamClusterTTLAndDedupe(t *testing.T) { _, err = js.PublishMsg(m) require_NoError(t, err) } + +func TestJetStreamClusterServerPeerRemovePeersDrift(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R4S", 4) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Retention: nats.LimitsPolicy, + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + var acc *Account + var mset *stream + + // Wait for 3 of the 4 servers to have created the stream. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + count := 0 + for _, s := range c.servers { + acc, err = s.lookupAccount(globalAccountName) + if err != nil { + return err + } + _, err = acc.lookupStream("TEST") + if err != nil { + continue + } + count++ + } + if count != 3 { + return fmt.Errorf("expected 3 streams, got: %d", count) + } + return nil + }) + + sl := c.streamLeader(globalAccountName, "TEST") + + // Get a random server that: + // - is not stream leader + // - is not meta leader (peer-removing the meta leader has other issues) + // - already hosts the stream so a peer-remove results in changing the stream peer set + var rs *Server + for _, s := range c.servers { + acc, err = s.lookupAccount(globalAccountName) + require_NoError(t, err) + _, err = acc.lookupStream("TEST") + if s == sl || s.isMetaLeader.Load() || err != nil { + continue + } + rs = s + break + } + if rs == nil { + t.Fatal("No server found that's not either stream or meta leader.") + } + rs.Shutdown() + + // Peer-remove the selected server so the stream moves to the remaining empty server. + sc, err := nats.Connect(sl.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + req := &JSApiMetaServerRemoveRequest{Server: rs.Name()} + jsreq, err := json.Marshal(req) + require_NoError(t, err) + _, err = sc.Request(JSApiRemoveServer, jsreq, time.Second) + require_NoError(t, err) + + // Eventually there should again be a R3 stream and everyone should agree on the peers. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + count := 0 + var ps []string + for _, s := range c.servers { + if s == rs { + continue + } + acc, err = s.lookupAccount(globalAccountName) + if err != nil { + return err + } + mset, err = acc.lookupStream("TEST") + if err != nil { + return err + } + rn := mset.raftNode().(*raft) + rn.RLock() + peerNames := rn.peerNames() + rn.RUnlock() + slices.Sort(peerNames) + if count == 0 { + ps = peerNames + } else if !slices.Equal(ps, peerNames) { + rsid := rs.NodeName() + containsOld := slices.Contains(ps, rsid) || slices.Contains(peerNames, rsid) + return fmt.Errorf("no equal peers, expected: %v, got: %v, contains old peer (%s): %v", ps, peerNames, rsid, containsOld) + } + count++ + } + if count != 3 { + return fmt.Errorf("expected 3 servers hosting stream, got: %d", count) + } + return nil + }) +}