diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 471132fc7c5..17d441e8f43 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7753,8 +7753,15 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec if rBefore < rAfter { newPeerSet := nca.Group.Peers - // scale up by adding new members from the stream peer set that are not yet in the consumer peer set + // Scale up by adding new members from the stream peer set that are not yet in the consumer peer set. streamPeerSet := copyStrings(sa.Group.Peers) + + // Respond with error when there is a config mismatch between the intended config and expected peer size. + if len(streamPeerSet) < rAfter { + resp.Error = NewJSConsumerReplicasExceedsStreamError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) + return + } rand.Shuffle(rAfter, func(i, j int) { streamPeerSet[i], streamPeerSet[j] = streamPeerSet[j], streamPeerSet[i] }) for _, p := range streamPeerSet { found := false diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index c35bb4bdd7d..8179647986a 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -6643,3 +6643,72 @@ func TestJetStreamClusterSDMMaxAgeProposeExpiryShortRetry(t *testing.T) { }) } } + +func TestJetStreamClusterInvalidR1Config(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R1TEST", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.servers[0]) + defer nc.Close() + + nc2, js2 := jsClientConnect(t, c.servers[2]) + defer nc2.Close() + + createStreams := func(t *testing.T, js nats.JetStreamContext, n, replicas int) { + for i := 0; i < n; i++ { + sname := fmt.Sprintf("S:%d", i) + js.AddStream(&nats.StreamConfig{ + Name: sname, + MaxMsgsPerSubject: 5, + Replicas: replicas, + Subjects: []string{fmt.Sprintf("A.%d.>", i)}, + }) + time.Sleep(10 * time.Millisecond) + js.AddConsumer(sname, &nats.ConsumerConfig{ + Name: sname, + Durable: sname, + FilterSubject: ">", + }) + js.Publish(fmt.Sprintf("A.%d.foo", i), []byte("one")) + } + } + + // Create 5 streams in parallel with different configs, then + // check whether one of them is in an undefined state. + totalStreams := 5 + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + createStreams(t, js, totalStreams, 1) + }() + + wg.Add(1) + go func() { + defer wg.Done() + createStreams(t, js2, totalStreams, 2) + }() + wg.Wait() + + for i := 0; i < totalStreams; i++ { + ci, err := js.StreamInfo(fmt.Sprintf("S:%d", i)) + require_NoError(t, err) + + // Make sure that consumer scale up when peers are missing responds with error. + if ci.Config.Replicas == 2 { + // Starting with a single replica should still be valid. + js.AddConsumer(ci.Config.Name, &nats.ConsumerConfig{ + Name: "test", + Replicas: 1, + }) + _, err = js.UpdateConsumer(ci.Config.Name, &nats.ConsumerConfig{ + Name: "test", + Replicas: 2, + }) + if err != nil { + require_Equal(t, err.Error(), "nats: consumer config replica count exceeds parent stream") + } + } + } +}