Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7753,8 +7753,15 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec

if rBefore < rAfter {
newPeerSet := nca.Group.Peers
// scale up by adding new members from the stream peer set that are not yet in the consumer peer set
// Scale up by adding new members from the stream peer set that are not yet in the consumer peer set.
streamPeerSet := copyStrings(sa.Group.Peers)

// Respond with error when there is a config mismatch between the intended config and expected peer size.
if len(streamPeerSet) < rAfter {
resp.Error = NewJSConsumerReplicasExceedsStreamError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
rand.Shuffle(rAfter, func(i, j int) { streamPeerSet[i], streamPeerSet[j] = streamPeerSet[j], streamPeerSet[i] })
for _, p := range streamPeerSet {
found := false
Expand Down
69 changes: 69 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6643,3 +6643,72 @@ func TestJetStreamClusterSDMMaxAgeProposeExpiryShortRetry(t *testing.T) {
})
}
}

func TestJetStreamClusterInvalidR1Config(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R1TEST", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.servers[0])
defer nc.Close()

nc2, js2 := jsClientConnect(t, c.servers[2])
defer nc2.Close()

createStreams := func(t *testing.T, js nats.JetStreamContext, n, replicas int) {
for i := 0; i < n; i++ {
sname := fmt.Sprintf("S:%d", i)
js.AddStream(&nats.StreamConfig{
Name: sname,
MaxMsgsPerSubject: 5,
Replicas: replicas,
Subjects: []string{fmt.Sprintf("A.%d.>", i)},
})
time.Sleep(10 * time.Millisecond)
js.AddConsumer(sname, &nats.ConsumerConfig{
Name: sname,
Durable: sname,
FilterSubject: ">",
})
js.Publish(fmt.Sprintf("A.%d.foo", i), []byte("one"))
}
}

// Create 5 streams in parallel with different configs, then
// check whether one of them is in an undefined state.
totalStreams := 5

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
createStreams(t, js, totalStreams, 1)
}()

wg.Add(1)
go func() {
defer wg.Done()
createStreams(t, js2, totalStreams, 2)
}()
wg.Wait()

for i := 0; i < totalStreams; i++ {
ci, err := js.StreamInfo(fmt.Sprintf("S:%d", i))
require_NoError(t, err)

// Make sure that consumer scale up when peers are missing responds with error.
if ci.Config.Replicas == 2 {
// Starting with a single replica should still be valid.
js.AddConsumer(ci.Config.Name, &nats.ConsumerConfig{
Name: "test",
Replicas: 1,
})
_, err = js.UpdateConsumer(ci.Config.Name, &nats.ConsumerConfig{
Name: "test",
Replicas: 2,
})
if err != nil {
require_Equal(t, err.Error(), "nats: consumer config replica count exceeds parent stream")
}
}
}
}