diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3d987770924..1eabc7dfcb6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4352,28 +4352,18 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state // Check if we already have this consumer running. o := mset.lookupConsumer(consumer) - if !alreadyRunning { - // Process the raft group and make sure its running if needed. - storage := mset.config().Storage - if ca.Config.MemoryStorage { - storage = MemoryStorage - } - // No-op if R1. - js.createRaftGroup(accName, rg, storage, pprofLabels{ - "type": "consumer", - "account": mset.accName(), - "stream": ca.Stream, - "consumer": ca.Name, - }) - } else { - // If we are clustered update the known peers. - js.mu.RLock() - node := rg.node - js.mu.RUnlock() - if node != nil { - node.UpdateKnownPeers(ca.Group.Peers) - } + // Process the raft group and make sure it's running if needed. + storage := mset.config().Storage + if ca.Config.MemoryStorage { + storage = MemoryStorage } + // No-op if R1. + js.createRaftGroup(accName, rg, storage, pprofLabels{ + "type": "consumer", + "account": mset.accName(), + "stream": ca.Stream, + "consumer": ca.Name, + }) // Check if we already have this consumer running. var didCreate, isConfigUpdate, needsLocalResponse bool diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 79c8ea5ac51..698bb5916d5 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -5139,6 +5139,12 @@ func TestJetStreamClusterServerPeerRemovePeersDrift(t *testing.T) { }) require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 3, + }) + require_NoError(t, err) + var acc *Account var mset *stream @@ -5150,14 +5156,18 @@ func TestJetStreamClusterServerPeerRemovePeersDrift(t *testing.T) { if err != nil { return err } - _, err = acc.lookupStream("TEST") + mset, err = acc.lookupStream("TEST") if err != nil { continue } + o := mset.lookupConsumer("CONSUMER") + if o == nil { + continue + } count++ } if count != 3 { - return fmt.Errorf("expected 3 streams, got: %d", count) + return fmt.Errorf("expected 3 streams/consumers, got: %d", count) } return nil }) @@ -5196,7 +5206,8 @@ func TestJetStreamClusterServerPeerRemovePeersDrift(t *testing.T) { // Eventually there should again be a R3 stream and everyone should agree on the peers. checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { count := 0 - var ps []string + var streamPeers []string + var consumerPeers []string for _, s := range c.servers { if s == rs { continue @@ -5209,22 +5220,38 @@ func TestJetStreamClusterServerPeerRemovePeersDrift(t *testing.T) { if err != nil { return err } - rn := mset.raftNode().(*raft) - rn.RLock() - peerNames := rn.peerNames() - rn.RUnlock() - slices.Sort(peerNames) + o := mset.lookupConsumer("CONSUMER") + if o == nil { + return fmt.Errorf("consumer not found on %s", s.Name()) + } + mrn := mset.raftNode().(*raft) + mrn.RLock() + streamPeerNames := mrn.peerNames() + mrn.RUnlock() + + orn := o.raftNode().(*raft) + orn.RLock() + consumerPeerNames := orn.peerNames() + orn.RUnlock() + + slices.Sort(streamPeerNames) + slices.Sort(consumerPeerNames) if count == 0 { - ps = peerNames - } else if !slices.Equal(ps, peerNames) { + streamPeers = streamPeerNames + consumerPeers = consumerPeerNames + } else if !slices.Equal(streamPeers, streamPeerNames) { + rsid := rs.NodeName() + containsOld := slices.Contains(streamPeers, rsid) || slices.Contains(streamPeerNames, rsid) + return fmt.Errorf("no equal stream peers, expected: %v, got: %v, contains old peer (%s): %v", streamPeers, streamPeerNames, rsid, containsOld) + } else if !slices.Equal(consumerPeers, consumerPeerNames) { rsid := rs.NodeName() - containsOld := slices.Contains(ps, rsid) || slices.Contains(peerNames, rsid) - return fmt.Errorf("no equal peers, expected: %v, got: %v, contains old peer (%s): %v", ps, peerNames, rsid, containsOld) + containsOld := slices.Contains(consumerPeers, rsid) || slices.Contains(consumerPeerNames, rsid) + return fmt.Errorf("no equal consumer peers, expected: %v, got: %v, contains old peer (%s): %v", consumerPeers, consumerPeerNames, rsid, containsOld) } count++ } if count != 3 { - return fmt.Errorf("expected 3 servers hosting stream, got: %d", count) + return fmt.Errorf("expected 3 servers hosting stream/consumer, got: %d", count) } return nil })