From 025fd8febc7b6794e8f4e9d7d212b752c50611bf Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 9 Dec 2024 14:24:58 +0100 Subject: [PATCH] [FIXED] Consistently report AckFloor when replicated Signed-off-by: Maurice van Veen --- server/consumer.go | 17 +++---- server/jetstream_cluster_3_test.go | 62 ++++++++++++++++---------- server/jetstream_super_cluster_test.go | 16 ++++--- 3 files changed, 56 insertions(+), 39 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index e4118c72cf5..57acc91d4f6 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2948,23 +2948,20 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { } } - // If we are replicated and we are not the leader or we are filtered, we need to pull certain data from our store. - isLeader := o.isLeader() - if rg != nil && rg.node != nil && o.store != nil && (!isLeader || o.isFiltered()) { + // If we are replicated, we need to pull certain data from our store. + if rg != nil && rg.node != nil && o.store != nil { state, err := o.store.BorrowState() if err != nil { o.mu.Unlock() return nil } - if !isLeader { - info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream - info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream + // If we are the leader we could have o.sseq that is skipped ahead. + // To maintain consistency in reporting (e.g. jsz) we always take the state for our delivered/ackfloor stream sequence. + info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream + info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream + if !o.isLeader() { info.NumAckPending = len(state.Pending) info.NumRedelivered = len(state.Redelivered) - } else { - // Since we are filtered and we are the leader we could have o.sseq that is skipped ahead. - // To maintain consistency in reporting (e.g. jsz) we take the state for our delivered stream sequence. - info.Delivered.Stream = state.Delivered.Stream } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index db370e53939..e027c4ea7e2 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3575,7 +3575,7 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { sub, err := js.PullSubscribe("foo", "C") require_NoError(t, err) - // Publish as many messages as the ack floor check threshold +5. + // Publish as many messages as the ack floor check threshold +5 (what we set ackfloor to later). totalMessages := 55 for i := 0; i < totalMessages; i++ { sendStreamMsg(t, nc, "foo", "HELLO") @@ -3585,19 +3585,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { _, err = sub.Fetch(10) require_NoError(t, err) - // We will grab the state with delivered being 10 and ackfloor being 0 directly. - cl := c.consumerLeader(globalAccountName, "TEST", "C") - require_NotNil(t, cl) - - mset, err := cl.GlobalAccount().lookupStream("TEST") - require_NoError(t, err) - o := mset.lookupConsumer("C") - require_NotNil(t, o) - o.mu.RLock() - state, err := o.store.State() - o.mu.RUnlock() - require_NoError(t, err) - require_NotNil(t, state) + // We will initialize the state with delivered being 10 and ackfloor being 0 directly. + // Fetch will asynchronously propagate this state, so can't reliably request this from the leader immediately. + state := &ConsumerState{Delivered: SequencePair{Consumer: 10, Stream: 10}} // Now let messages expire. checkFor(t, 5*time.Second, time.Second, func() error { @@ -3634,17 +3624,35 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { require_NoError(t, o.raftNode().InstallSnapshot(snap)) } - cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C") + cl := c.consumerLeader(globalAccountName, "TEST", "C") + require_NotNil(t, cl) + err = cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C") + require_NoError(t, err) c.waitOnConsumerLeader(globalAccountName, "TEST", "C") checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { ci, err := js.ConsumerInfo("TEST", "C") + if err != nil { + return err + } + // Replicated state should stay the same. + if ci.AckFloor.Stream != 5 && ci.AckFloor.Consumer != 5 { + return fmt.Errorf("replicated AckFloor not correct, expected %d, got %+v", 5, ci.AckFloor) + } + + cl = c.consumerLeader(globalAccountName, "TEST", "C") + mset, err := cl.GlobalAccount().lookupStream("TEST") require_NoError(t, err) + o := mset.lookupConsumer("C") + require_NotNil(t, o) + o.mu.RLock() + defer o.mu.RUnlock() + // Make sure we catch this and adjust. - if ci.AckFloor.Stream == uint64(totalMessages) && ci.AckFloor.Consumer == 10 { - return nil + if o.asflr != uint64(totalMessages) && o.adflr != 10 { + return fmt.Errorf("leader AckFloor not correct, expected %d, got %+v", 10, ci.AckFloor) } - return fmt.Errorf("AckFloor not correct, expected %d, got %+v", totalMessages, ci.AckFloor) + return nil }) } @@ -5458,12 +5466,18 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { // Want to compare sans cluster details which we know will change due to leader change. // Also last activity for delivered can be slightly off so nil out as well. - checkConsumerInfo := func(a, b *nats.ConsumerInfo) { + checkConsumerInfo := func(a, b *nats.ConsumerInfo, replicated bool) { t.Helper() require_Equal(t, a.Delivered.Consumer, 10) require_Equal(t, a.Delivered.Stream, 10) - require_Equal(t, a.AckFloor.Consumer, 10) - require_Equal(t, a.AckFloor.Stream, 10) + // If replicated, agreed upon state is used. Otherwise, o.asflr and o.adflr would be skipped ahead for R1. + if replicated { + require_Equal(t, a.AckFloor.Consumer, 0) + require_Equal(t, a.AckFloor.Stream, 0) + } else { + require_Equal(t, a.AckFloor.Consumer, 10) + require_Equal(t, a.AckFloor.Stream, 10) + } require_Equal(t, a.NumPending, 40) require_Equal(t, a.NumRedelivered, 0) a.Cluster, b.Cluster = nil, nil @@ -5473,7 +5487,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { } } - checkConsumerInfo(cia, cib) + checkConsumerInfo(cia, cib, true) // Memory based. sub, err = js.PullSubscribe("foo", "mem", @@ -5503,7 +5517,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { cib, err = js.ConsumerInfo("TEST", "mem") require_NoError(t, err) - checkConsumerInfo(cia, cib) + checkConsumerInfo(cia, cib, true) // Now file based but R1 and server restart. sub, err = js.PullSubscribe("foo", "r1", @@ -5537,7 +5551,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { // Created can skew a small bit due to server restart, this is expected. now := time.Now() cia.Created, cib.Created = now, now - checkConsumerInfo(cia, cib) + checkConsumerInfo(cia, cib, false) } func TestJetStreamClusterConsumerDefaultsFromStream(t *testing.T) { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index c2ec399041a..3f6928457c1 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -1665,14 +1665,20 @@ func TestJetStreamSuperClusterConsumerDeliverNewBug(t *testing.T) { } c.waitOnConsumerLeader("$G", "T", "d") - ci, err = js.ConsumerInfo("T", "d") + + cl := c.consumerLeader(globalAccountName, "T", "d") + mset, err := cl.GlobalAccount().lookupStream("T") require_NoError(t, err) + o := mset.lookupConsumer("d") + require_NotNil(t, o) + o.mu.RLock() + defer o.mu.RUnlock() - if ci.Delivered.Consumer != 0 || ci.Delivered.Stream != 100 { - t.Fatalf("Incorrect consumer delivered info: %+v", ci.Delivered) + if o.dseq-1 != 0 || o.sseq-1 != 100 { + t.Fatalf("Incorrect consumer delivered info: dseq=%d, sseq=%d", o.dseq-1, o.sseq-1) } - if ci.NumPending != 0 { - t.Fatalf("Did not expect NumPending, got %d", ci.NumPending) + if np := o.checkNumPending(); np != 0 { + t.Fatalf("Did not expect NumPending, got %d", np) } }