diff --git a/server/consumer.go b/server/consumer.go index 28951dec53e..b1d316a6d90 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4270,7 +4270,7 @@ func (o *consumer) checkAckFloor() { // We will set it explicitly to 1 behind our current lowest in pending, or if // pending is empty, to our current delivered -1. const minOffThreshold = 50 - if o.asflr < ss.FirstSeq-minOffThreshold { + if ss.FirstSeq >= minOffThreshold && o.asflr < ss.FirstSeq-minOffThreshold { var psseq, pdseq uint64 for seq, p := range o.pending { if psseq == 0 || seq < psseq { diff --git a/server/filestore.go b/server/filestore.go index 79d91746a26..22ccbcec936 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -9433,14 +9433,6 @@ func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error { } func (o *consumerFileStore) Update(state *ConsumerState) error { - o.mu.Lock() - defer o.mu.Unlock() - - // Check to see if this is an outdated update. - if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream { - return nil - } - // Sanity checks. if state.AckFloor.Consumer > state.Delivered.Consumer { return fmt.Errorf("bad ack floor for consumer") @@ -9468,6 +9460,15 @@ func (o *consumerFileStore) Update(state *ConsumerState) error { } } + // Replace our state. + o.mu.Lock() + defer o.mu.Unlock() + + // Check to see if this is an outdated update. + if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream { + return fmt.Errorf("old update ignored") + } + o.state.Delivered = state.Delivered o.state.AckFloor = state.AckFloor o.state.Pending = pending diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index deb5adf1a48..1dc0d4d3745 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3552,7 +3552,7 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) { s.WaitForShutdown() } -// If a consumer has not been registered (possible in heavily loaded systems with lots of assets) +// If a consumer has not been registered (possible in heavily loaded systems with lots of assets) // it could miss the signal of a message going away. If that message was pending and expires the // ack floor could fall below the stream first sequence. This test will force that condition and // make sure the system resolves itself. @@ -3575,7 +3575,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { sub, err := js.PullSubscribe("foo", "C") require_NoError(t, err) - for i := 0; i < 10; i++ { + // Publish as many messages as the ack floor check threshold +5. + totalMessages := 55 + for i := 0; i < totalMessages; i++ { sendStreamMsg(t, nc, "foo", "HELLO") } @@ -3619,10 +3621,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { o := mset.lookupConsumer("C") require_NotNil(t, o) o.mu.Lock() - err = o.setStoreState(state) + o.applyState(state) cfs := o.store.(*consumerFileStore) o.mu.Unlock() - require_NoError(t, err) // The lower layer will ignore, so set more directly. cfs.mu.Lock() cfs.state = *state @@ -3640,10 +3641,10 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { ci, err := js.ConsumerInfo("TEST", "C") require_NoError(t, err) // Make sure we catch this and adjust. - if ci.AckFloor.Stream == 10 && ci.AckFloor.Consumer == 10 { + if ci.AckFloor.Stream == uint64(totalMessages) && ci.AckFloor.Consumer == 10 { return nil } - return fmt.Errorf("AckFloor not correct, expected 10, got %+v", ci.AckFloor) + return fmt.Errorf("AckFloor not correct, expected %d, got %+v", totalMessages, ci.AckFloor) }) } diff --git a/server/memstore.go b/server/memstore.go index ee953e1fa99..48defb8059d 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1689,8 +1689,6 @@ func (o *consumerMemStore) Update(state *ConsumerState) error { pending = make(map[uint64]*Pending, len(state.Pending)) for seq, p := range state.Pending { pending[seq] = &Pending{p.Sequence, p.Timestamp} - } - for seq := range pending { if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream { return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq) } @@ -1705,10 +1703,10 @@ func (o *consumerMemStore) Update(state *ConsumerState) error { // Replace our state. o.mu.Lock() + defer o.mu.Unlock() // Check to see if this is an outdated update. - if state.Delivered.Consumer < o.state.Delivered.Consumer { - o.mu.Unlock() + if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream { return fmt.Errorf("old update ignored") } @@ -1716,7 +1714,6 @@ func (o *consumerMemStore) Update(state *ConsumerState) error { o.state.AckFloor = state.AckFloor o.state.Pending = pending o.state.Redelivered = redelivered - o.mu.Unlock() return nil }