diff --git a/server/jetstream_test.go b/server/jetstream_test.go index f00f47d3a6d..9b911b617ea 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -20164,3 +20164,49 @@ func TestJetStreamDirectGetStartTimeSingleMsg(t *testing.T) { }) } } + +func TestJetStreamStreamRetentionUpdatesConsumers(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + for _, tc := range []struct { + from RetentionPolicy + to RetentionPolicy + }{ + {LimitsPolicy, InterestPolicy}, + {InterestPolicy, LimitsPolicy}, + } { + from, to, name := tc.from, tc.to, fmt.Sprintf("%sTo%s", tc.from, tc.to) + t.Run(name, func(t *testing.T) { + sc, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: name, + Subjects: []string{name}, + Retention: from, + Storage: FileStorage, + }) + require_NoError(t, err) + + _, err = js.AddConsumer(name, &nats.ConsumerConfig{ + Name: "test_consumer", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + mset, err := s.globalAccount().lookupStream(name) + require_NoError(t, err) + + o := mset.lookupConsumer("test_consumer") + require_NotNil(t, err) + require_Equal(t, o.retention, from) + + sc.Retention = to + _, err = jsStreamUpdate(t, nc, sc) + require_NoError(t, err) + + require_Equal(t, o.retention, to) + }) + } +} diff --git a/server/stream.go b/server/stream.go index d7b840fe718..cb150fcd884 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2177,7 +2177,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool, // If we're changing retention and haven't errored because of consumer // replicas by now, whip through and update the consumer retention. - if ocfg.Retention != cfg.Retention && cfg.Retention == InterestPolicy { + if ocfg.Retention != cfg.Retention { toUpdate := make([]*consumer, 0, len(mset.consumers)) for _, c := range mset.consumers { toUpdate = append(toUpdate, c)