diff --git a/server/consumer.go b/server/consumer.go index 293e06923a9..f5781528c3d 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1107,7 +1107,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri uch: make(chan struct{}, 1), mch: make(chan struct{}, 1), sfreq: int32(sampleFreq), - maxdc: uint64(config.MaxDeliver), + maxdc: uint64(max(config.MaxDeliver, 0)), // MaxDeliver is negative (-1) when infinite. maxp: config.MaxAckPending, retention: cfg.Retention, created: time.Now().UTC(), @@ -2322,7 +2322,8 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { } // Set MaxDeliver if changed if cfg.MaxDeliver != o.cfg.MaxDeliver { - o.maxdc = uint64(cfg.MaxDeliver) + // MaxDeliver is negative (-1) when infinite. + o.maxdc = uint64(max(cfg.MaxDeliver, 0)) } // Set InactiveThreshold if changed. if val := cfg.InactiveThreshold; val != o.cfg.InactiveThreshold { diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 426a1b0944f..a738f633f2f 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -10241,3 +10241,47 @@ func TestJetStreamConsumerPrioritized(t *testing.T) { require_NotNil(t, msg) }) } + +func TestJetStreamConsumerMaxDeliverUnderflow(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}}) + require_NoError(t, err) + + cfg := &nats.ConsumerConfig{Durable: "CONSUMER", MaxDeliver: -1} + _, err = js.AddConsumer("TEST", cfg) + require_NoError(t, err) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + // Infinite MaxDeliver should be zero. + o.mu.RLock() + maxdc := o.maxdc + o.mu.RUnlock() + require_Equal(t, maxdc, 0) + + // Finite MaxDeliver should be reported the same. + cfg.MaxDeliver = 1 + _, err = js.UpdateConsumer("TEST", cfg) + require_NoError(t, err) + o.mu.RLock() + maxdc = o.maxdc + o.mu.RUnlock() + require_Equal(t, maxdc, 1) + + // Infinite MaxDeliver should be zero. + cfg.MaxDeliver = -1 + _, err = js.UpdateConsumer("TEST", cfg) + require_NoError(t, err) + o.mu.RLock() + maxdc = o.maxdc + o.mu.RUnlock() + require_Equal(t, maxdc, 0) +}