diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ca44091416e..05170541a82 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5097,6 +5097,16 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error { // Update activity. o.lat = time.Now() + var sagap uint64 + if o.cfg.AckPolicy == AckAll { + // Always use the store state, as o.asflr is skipped ahead already. + // Capture before updating store. + state, err := o.store.BorrowState() + if err == nil { + sagap = sseq - state.AckFloor.Stream + } + } + // Do actual ack update to store. // Always do this to have it recorded. o.store.UpdateAcks(dseq, sseq) @@ -5121,21 +5131,6 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error { o.mu.Unlock() return nil } - - var sagap uint64 - if o.cfg.AckPolicy == AckAll { - if o.isLeader() { - sagap = sseq - o.asflr - } else { - // We are a follower so only have the store state, so read that in. - state, err := o.store.State() - if err != nil { - o.mu.Unlock() - return err - } - sagap = sseq - state.AckFloor.Stream - } - } o.mu.Unlock() if sagap > 1 { diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index eff52af7bca..7f41d60fbc9 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -8004,6 +8004,70 @@ func TestJetStreamClusterUpgradeConsumerVersioning(t *testing.T) { } } +func TestJetStreamClusterInterestPolicyAckAll(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Retention: nats.InterestPolicy, + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + AckPolicy: nats.AckAllPolicy, + }) + require_NoError(t, err) + + for i := 0; i < 100; i++ { + _, err = js.Publish("foo", []byte("ok")) + require_NoError(t, err) + } + + expectedStreamMsgs := func(msgs uint64) { + t.Helper() + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + si, err := js.StreamInfo("TEST") + if err != nil { + return err + } + if si.State.Msgs != msgs { + return fmt.Errorf("require uint64 equal, but got: %d != %d", si.State.Msgs, msgs) + } + return nil + }) + } + expectedStreamMsgs(100) + + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + o.mu.Lock() + // Ensure o.checkStateForInterestStream can't hide that the issue happened. + o.chkflr = 1000 + o.mu.Unlock() + } + + sub, err := js.PullSubscribe("foo", "CONSUMER") + require_NoError(t, err) + msgs, err := sub.Fetch(50) + require_NoError(t, err) + require_True(t, len(msgs) == 50) + require_NoError(t, msgs[49].AckSync()) + + expectedStreamMsgs(50) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value.