diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 90ff7c58332..d765f546f53 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7429,6 +7429,7 @@ func (cc *jetStreamCluster) createGroupForConsumer(cfg *ConsumerConfig, sa *stre return nil } + replicas := cfg.replicas(sa.Config) peers := copyStrings(sa.Group.Peers) var _ss [5]string active := _ss[:0] @@ -7441,20 +7442,20 @@ func (cc *jetStreamCluster) createGroupForConsumer(cfg *ConsumerConfig, sa *stre } } } - if quorum := cfg.Replicas/2 + 1; quorum > len(active) { + if quorum := replicas/2 + 1; quorum > len(active) { // Not enough active to satisfy the request. return nil } // If we want less then our parent stream, select from active. - if cfg.Replicas > 0 && cfg.Replicas < len(peers) { + if replicas > 0 && replicas < len(peers) { // Pedantic in case stream is say R5 and consumer is R3 and 3 or more offline, etc. - if len(active) < cfg.Replicas { + if len(active) < replicas { return nil } // First shuffle the active peers and then select to account for replica = 1. rand.Shuffle(len(active), func(i, j int) { active[i], active[j] = active[j], active[i] }) - peers = active[:cfg.Replicas] + peers = active[:replicas] } storage := sa.Config.Storage if cfg.MemoryStorage { @@ -7640,12 +7641,6 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // We need to set the ephemeral here before replicating. if !isDurableConsumer(cfg) { - // We chose to have ephemerals be R=1 unless stream is interest or workqueue. - // Consumer can override. - if sa.Config.Retention == LimitsPolicy && cfg.Replicas <= 1 { - rg.Peers = []string{rg.Preferred} - rg.Name = groupNameForConsumer(rg.Peers, rg.Storage) - } if cfg.Name != _EMPTY_ { oname = cfg.Name } else { diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 515c60d6f50..4a85168da61 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9026,6 +9026,25 @@ func TestJetStreamClusterCreateR3StreamWithOfflineNodes(t *testing.T) { }) } +func TestJetStreamClusterCreateEphemeralConsumerWithOfflineNodes(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + ml := c.leader() + nc, js := jsClientConnect(t, ml) + defer nc.Close() + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3}) + require_NoError(t, err) + + // Shutdown a random server. + c.randomNonLeader().Shutdown() + + for range 10 { + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{}) + require_NoError(t, err) + } +} + func TestJetStreamClusterSetPreferredToOnlineNode(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown()