From e8dd1dd9a379c94f0c335da208c79559d8c31d51 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 6 Dec 2024 15:32:36 +0100 Subject: [PATCH 01/15] [FIXED] Subject state consistency Signed-off-by: Maurice van Veen --- server/filestore.go | 26 +++++++----- server/memstore.go | 30 +++++++++----- server/store_test.go | 94 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+), 19 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 8d2bfa07c11..2224618bd9a 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2613,10 +2613,6 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si // Always reset. ss.First, ss.Last, ss.Msgs = 0, 0, 0 - if filter == _EMPTY_ { - filter = fwcs - } - // We do need to figure out the first and last sequences. wc := subjectHasWildcard(filter) start, stop := uint32(math.MaxUint32), uint32(0) @@ -7388,6 +7384,9 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { // Update fss smb.removeSeqPerSubject(sm.subj, mseq) fs.removePerSubject(sm.subj) + // Need to mark the sequence as deleted. Otherwise, recalculating ss.First + // for per-subject info would be able to find it still. + smb.dmap.Insert(mseq) } } @@ -7835,11 +7834,16 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { // Only one left. if ss.Msgs == 1 { - if seq == ss.Last { - ss.Last = ss.First - } else { - ss.First = ss.Last + // Update first if we need to, we must check if this removal is about what's going to be ss.First + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } + // If we're removing the first message, we must recalculate again. + // ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it. + if ss.First == seq { + mb.recalculateFirstForSubj(subj, ss.First, ss) + } + ss.Last = ss.First ss.firstNeedsUpdate = false return } @@ -7869,8 +7873,12 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si startSlot = 0 } + fseq := startSeq + 1 + if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { + fseq = mbFseq + } var le = binary.LittleEndian - for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ { + for slot := startSlot; slot < len(mb.cache.idx); slot++ { bi := mb.cache.idx[slot] &^ hbit if bi == dbit { // delete marker so skip. diff --git a/server/memstore.go b/server/memstore.go index cdf84a74c80..acf0616da08 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1006,8 +1006,9 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { if sm := ms.msgs[seq]; sm != nil { bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) purged++ - delete(ms.msgs, seq) ms.removeSeqPerSubject(sm.subj, seq) + // Must delete message after updating per-subject info, to be consistent with file store. + delete(ms.msgs, seq) } } if purged > ms.state.Msgs { @@ -1095,8 +1096,9 @@ func (ms *memStore) Truncate(seq uint64) error { if sm := ms.msgs[i]; sm != nil { purged++ bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) - delete(ms.msgs, i) ms.removeSeqPerSubject(sm.subj, i) + // Must delete message after updating per-subject info, to be consistent with file store. + delete(ms.msgs, i) } } // Reset last. @@ -1357,17 +1359,24 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { } ss.Msgs-- - // If we know we only have 1 msg left don't need to search for next first. + // Only one left. if ss.Msgs == 1 { - if seq == ss.Last { - ss.Last = ss.First - } else { - ss.First = ss.Last + // Update first if we need to, we must check if this removal is about what's going to be ss.First + if ss.firstNeedsUpdate { + ms.recalculateFirstForSubj(subj, ss.First, ss) } + // If we're removing the first message, we must recalculate again. + // ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it. + if ss.First == seq { + ms.recalculateFirstForSubj(subj, ss.First, ss) + } + ss.Last = ss.First ss.firstNeedsUpdate = false - } else { - ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + return } + + // We can lazily calculate the first sequence when needed. + ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate } // Will recalculate the first sequence for this subject in this block. @@ -1397,7 +1406,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg) - delete(ms.msgs, seq) if ms.state.Msgs > 0 { ms.state.Msgs-- if ss > ms.state.Bytes { @@ -1422,6 +1430,8 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { // Remove any per subject tracking. ms.removeSeqPerSubject(sm.subj, seq) + // Must delete message after updating per-subject info, to be consistent with file store. + delete(ms.msgs, seq) if ms.scb != nil { // We do not want to hold any locks here. diff --git a/server/store_test.go b/server/store_test.go index f7832974b5b..d2c3481a1a5 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -141,3 +141,97 @@ func TestStoreDeleteRange(t *testing.T) { require_Equal(t, last, 2) require_Equal(t, num, 1) } + +func TestStoreSubjectStateConsistency(t *testing.T) { + testAllStoreAllPermutations( + t, false, + StreamConfig{Name: "TEST", Subjects: []string{"foo"}}, + func(t *testing.T, fs StreamStore) { + getSubjectState := func() SimpleState { + t.Helper() + ss := fs.SubjectsState("foo") + return ss["foo"] + } + + // Publish an initial batch of messages. + for i := 0; i < 4; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + // Expect 4 msgs, with first=1, last=4. + ss := getSubjectState() + require_Equal(t, ss.Msgs, 4) + require_Equal(t, ss.First, 1) + require_Equal(t, ss.Last, 4) + + // Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate. + removed, err := fs.RemoveMsg(1) + require_NoError(t, err) + require_True(t, removed) + + // Will update first, so corrects to seq 2. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 3) + require_Equal(t, ss.First, 2) + require_Equal(t, ss.Last, 4) + + // Remove last message. + removed, err = fs.RemoveMsg(4) + require_NoError(t, err) + require_True(t, removed) + + // ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 2) + require_Equal(t, ss.First, 2) + require_Equal(t, ss.Last, 4) + + // Remove first message again. + removed, err = fs.RemoveMsg(2) + require_NoError(t, err) + require_True(t, removed) + + // Since we only have one message left, must update ss.First and set ss.Last to equal. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.First, 3) + require_Equal(t, ss.Last, 3) + + // Publish some more messages so we can test another scenario. + for i := 0; i < 3; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + // Just check the state is complete again. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 4) + require_Equal(t, ss.First, 3) + require_Equal(t, ss.Last, 7) + + // Remove last sequence, ss.Last is lazy so doesn't get updated. + removed, err = fs.RemoveMsg(7) + require_NoError(t, err) + require_True(t, removed) + + // Remove first sequence, ss.First is lazy so doesn't get updated. + removed, err = fs.RemoveMsg(3) + require_NoError(t, err) + require_True(t, removed) + + // Remove (now) first sequence, but because ss.First is lazy we first need to recalculate + // to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First + // yet again, since ss.Last is lazy and is not correct. + removed, err = fs.RemoveMsg(5) + require_NoError(t, err) + require_True(t, removed) + + // ss.First should equal ss.Last, last should have been updated now. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.First, 6) + require_Equal(t, ss.Last, 6) + }, + ) +} From 3b9779f03568be67c0addeb037af8ff917151be3 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 9 Dec 2024 14:24:58 +0100 Subject: [PATCH 02/15] [FIXED] Consistently report AckFloor when replicated Signed-off-by: Maurice van Veen --- server/consumer.go | 17 +++---- server/jetstream_cluster_3_test.go | 62 ++++++++++++++++---------- server/jetstream_super_cluster_test.go | 16 ++++--- 3 files changed, 56 insertions(+), 39 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index a91c1925441..438041ec899 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2667,23 +2667,20 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { TimeStamp: time.Now().UTC(), } - // If we are replicated and we are not the leader or we are filtered, we need to pull certain data from our store. - isLeader := o.isLeader() - if rg != nil && rg.node != nil && o.store != nil && (!isLeader || o.isFiltered()) { + // If we are replicated, we need to pull certain data from our store. + if rg != nil && rg.node != nil && o.store != nil { state, err := o.store.BorrowState() if err != nil { o.mu.Unlock() return nil } - if !isLeader { - info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream - info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream + // If we are the leader we could have o.sseq that is skipped ahead. + // To maintain consistency in reporting (e.g. jsz) we always take the state for our delivered/ackfloor stream sequence. + info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream + info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream + if !o.isLeader() { info.NumAckPending = len(state.Pending) info.NumRedelivered = len(state.Redelivered) - } else { - // Since we are filtered and we are the leader we could have o.sseq that is skipped ahead. - // To maintain consistency in reporting (e.g. jsz) we take the state for our delivered stream sequence. - info.Delivered.Stream = state.Delivered.Stream } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index a5b32c0e7b8..25d1ebea426 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3573,7 +3573,7 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { sub, err := js.PullSubscribe("foo", "C") require_NoError(t, err) - // Publish as many messages as the ack floor check threshold +5. + // Publish as many messages as the ack floor check threshold +5 (what we set ackfloor to later). totalMessages := 55 for i := 0; i < totalMessages; i++ { sendStreamMsg(t, nc, "foo", "HELLO") @@ -3583,19 +3583,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { _, err = sub.Fetch(10) require_NoError(t, err) - // We will grab the state with delivered being 10 and ackfloor being 0 directly. - cl := c.consumerLeader(globalAccountName, "TEST", "C") - require_NotNil(t, cl) - - mset, err := cl.GlobalAccount().lookupStream("TEST") - require_NoError(t, err) - o := mset.lookupConsumer("C") - require_NotNil(t, o) - o.mu.RLock() - state, err := o.store.State() - o.mu.RUnlock() - require_NoError(t, err) - require_NotNil(t, state) + // We will initialize the state with delivered being 10 and ackfloor being 0 directly. + // Fetch will asynchronously propagate this state, so can't reliably request this from the leader immediately. + state := &ConsumerState{Delivered: SequencePair{Consumer: 10, Stream: 10}} // Now let messages expire. checkFor(t, 5*time.Second, time.Second, func() error { @@ -3632,17 +3622,35 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { require_NoError(t, o.raftNode().InstallSnapshot(snap)) } - cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C") + cl := c.consumerLeader(globalAccountName, "TEST", "C") + require_NotNil(t, cl) + err = cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C") + require_NoError(t, err) c.waitOnConsumerLeader(globalAccountName, "TEST", "C") checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { ci, err := js.ConsumerInfo("TEST", "C") + if err != nil { + return err + } + // Replicated state should stay the same. + if ci.AckFloor.Stream != 5 && ci.AckFloor.Consumer != 5 { + return fmt.Errorf("replicated AckFloor not correct, expected %d, got %+v", 5, ci.AckFloor) + } + + cl = c.consumerLeader(globalAccountName, "TEST", "C") + mset, err := cl.GlobalAccount().lookupStream("TEST") require_NoError(t, err) + o := mset.lookupConsumer("C") + require_NotNil(t, o) + o.mu.RLock() + defer o.mu.RUnlock() + // Make sure we catch this and adjust. - if ci.AckFloor.Stream == uint64(totalMessages) && ci.AckFloor.Consumer == 10 { - return nil + if o.asflr != uint64(totalMessages) && o.adflr != 10 { + return fmt.Errorf("leader AckFloor not correct, expected %d, got %+v", 10, ci.AckFloor) } - return fmt.Errorf("AckFloor not correct, expected %d, got %+v", totalMessages, ci.AckFloor) + return nil }) } @@ -5456,12 +5464,18 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { // Want to compare sans cluster details which we know will change due to leader change. // Also last activity for delivered can be slightly off so nil out as well. - checkConsumerInfo := func(a, b *nats.ConsumerInfo) { + checkConsumerInfo := func(a, b *nats.ConsumerInfo, replicated bool) { t.Helper() require_Equal(t, a.Delivered.Consumer, 10) require_Equal(t, a.Delivered.Stream, 10) - require_Equal(t, a.AckFloor.Consumer, 10) - require_Equal(t, a.AckFloor.Stream, 10) + // If replicated, agreed upon state is used. Otherwise, o.asflr and o.adflr would be skipped ahead for R1. + if replicated { + require_Equal(t, a.AckFloor.Consumer, 0) + require_Equal(t, a.AckFloor.Stream, 0) + } else { + require_Equal(t, a.AckFloor.Consumer, 10) + require_Equal(t, a.AckFloor.Stream, 10) + } require_Equal(t, a.NumPending, 40) require_Equal(t, a.NumRedelivered, 0) a.Cluster, b.Cluster = nil, nil @@ -5471,7 +5485,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { } } - checkConsumerInfo(cia, cib) + checkConsumerInfo(cia, cib, true) // Memory based. sub, err = js.PullSubscribe("foo", "mem", @@ -5501,7 +5515,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { cib, err = js.ConsumerInfo("TEST", "mem") require_NoError(t, err) - checkConsumerInfo(cia, cib) + checkConsumerInfo(cia, cib, true) // Now file based but R1 and server restart. sub, err = js.PullSubscribe("foo", "r1", @@ -5535,7 +5549,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { // Created can skew a small bit due to server restart, this is expected. now := time.Now() cia.Created, cib.Created = now, now - checkConsumerInfo(cia, cib) + checkConsumerInfo(cia, cib, false) } func TestJetStreamClusterConsumerDefaultsFromStream(t *testing.T) { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 80fd87a6202..2e47cc857bc 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -1664,14 +1664,20 @@ func TestJetStreamSuperClusterConsumerDeliverNewBug(t *testing.T) { } c.waitOnConsumerLeader("$G", "T", "d") - ci, err = js.ConsumerInfo("T", "d") + + cl := c.consumerLeader(globalAccountName, "T", "d") + mset, err := cl.GlobalAccount().lookupStream("T") require_NoError(t, err) + o := mset.lookupConsumer("d") + require_NotNil(t, o) + o.mu.RLock() + defer o.mu.RUnlock() - if ci.Delivered.Consumer != 0 || ci.Delivered.Stream != 100 { - t.Fatalf("Incorrect consumer delivered info: %+v", ci.Delivered) + if o.dseq-1 != 0 || o.sseq-1 != 100 { + t.Fatalf("Incorrect consumer delivered info: dseq=%d, sseq=%d", o.dseq-1, o.sseq-1) } - if ci.NumPending != 0 { - t.Fatalf("Did not expect NumPending, got %d", ci.NumPending) + if np := o.checkNumPending(); np != 0 { + t.Fatalf("Did not expect NumPending, got %d", np) } } From 33ba45b1464d30f02f5ddefa4792ad17240f755b Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 10 Dec 2024 09:42:56 +0100 Subject: [PATCH 03/15] Improve per-subject state performance Signed-off-by: Maurice van Veen --- server/filestore.go | 19 +++---------------- server/memstore.go | 19 +++---------------- 2 files changed, 6 insertions(+), 32 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 2224618bd9a..7066c38e709 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -7832,22 +7832,6 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { ss.Msgs-- - // Only one left. - if ss.Msgs == 1 { - // Update first if we need to, we must check if this removal is about what's going to be ss.First - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) - } - // If we're removing the first message, we must recalculate again. - // ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it. - if ss.First == seq { - mb.recalculateFirstForSubj(subj, ss.First, ss) - } - ss.Last = ss.First - ss.firstNeedsUpdate = false - return - } - // We can lazily calculate the first sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate } @@ -7898,6 +7882,9 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si continue } ss.First = seq + if ss.Msgs == 1 { + ss.Last = seq + } return } } diff --git a/server/memstore.go b/server/memstore.go index acf0616da08..097a6493ee4 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1359,22 +1359,6 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { } ss.Msgs-- - // Only one left. - if ss.Msgs == 1 { - // Update first if we need to, we must check if this removal is about what's going to be ss.First - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) - } - // If we're removing the first message, we must recalculate again. - // ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it. - if ss.First == seq { - ms.recalculateFirstForSubj(subj, ss.First, ss) - } - ss.Last = ss.First - ss.firstNeedsUpdate = false - return - } - // We can lazily calculate the first sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate } @@ -1389,6 +1373,9 @@ func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si for ; tseq <= ss.Last; tseq++ { if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { ss.First = tseq + if ss.Msgs == 1 { + ss.Last = tseq + } ss.firstNeedsUpdate = false return } From 4e9b602a66efb80564dbe43ee5a1207c9fd7a5c1 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 10 Dec 2024 15:57:28 +0100 Subject: [PATCH 04/15] Don't mark deletes, we don't recalculate in fs.removePerSubject anymore Signed-off-by: Maurice van Veen --- server/filestore.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 7066c38e709..61f1f7abe69 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -7384,9 +7384,6 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { // Update fss smb.removeSeqPerSubject(sm.subj, mseq) fs.removePerSubject(sm.subj) - // Need to mark the sequence as deleted. Otherwise, recalculating ss.First - // for per-subject info would be able to find it still. - smb.dmap.Insert(mseq) } } From 0630752e4870092af94ed9c44e483e305fe70964 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 10 Dec 2024 18:07:28 +0100 Subject: [PATCH 05/15] [FIXED] ss.Last was not kept up-to-date Signed-off-by: Maurice van Veen --- server/filestore.go | 145 +++++++++++++++++++++++++++------------ server/filestore_test.go | 2 +- server/memstore.go | 72 ++++++++++++------- server/store.go | 2 + server/store_test.go | 39 ++++++++--- 5 files changed, 181 insertions(+), 79 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 61f1f7abe69..689e7e08efb 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2312,8 +2312,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor fseq = lseq + 1 for _, subj := range subs { ss, _ := mb.fss.Find(stringToBytes(subj)) - if ss != nil && ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { + mb.recalculateForSubj(subj, ss) } if ss == nil || start > ss.Last || ss.First >= fseq { continue @@ -2442,8 +2442,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( // If we already found a partial then don't do anything else. return } - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(bytesToString(bsubj), ss) } if sseq <= ss.First { update(ss) @@ -2742,8 +2742,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { mb.lsts = time.Now().UnixNano() mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) { subj := string(bsubj) - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } oss := fss[subj] if oss.First == 0 { // New @@ -2933,8 +2933,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) return } subj := bytesToString(bsubj) - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } if sseq <= ss.First { t += ss.Msgs @@ -3221,8 +3221,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo // If we already found a partial then don't do anything else. return } - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } if sseq <= ss.First { t += ss.Msgs @@ -3895,8 +3895,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { info.fblk = i } } - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } mb.mu.Unlock() // Re-acquire fs lock @@ -4027,8 +4027,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { mb.mu.Lock() mb.ensurePerSubjectInfoLoaded() ss, ok := mb.fss.Find(stringToBytes(subj)) - if ok && ss != nil && ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { + mb.recalculateForSubj(subj, ss) } mb.mu.Unlock() if ss == nil { @@ -7829,13 +7829,14 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { ss.Msgs-- - // We can lazily calculate the first sequence when needed. + // We can lazily calculate the first/last sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate } -// Will recalulate the first sequence for this subject in this block. +// Will recalculate the first and/or last sequence for this subject in this block. // Will avoid slower path message lookups and scan the cache directly instead. -func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) { +func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { // Need to make sure messages are loaded. if mb.cacheNotLoaded() { if err := mb.loadMsgsWithLock(); err != nil { @@ -7843,46 +7844,100 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si } } - // Mark first as updated. - ss.firstNeedsUpdate = false - - startSlot := int(startSeq - mb.cache.fseq) + startSlot := int(ss.First - mb.cache.fseq) + if startSlot < 0 { + startSlot = 0 + } if startSlot >= len(mb.cache.idx) { ss.First = ss.Last return - } else if startSlot < 0 { - startSlot = 0 } - - fseq := startSeq + 1 - if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { - fseq = mbFseq + endSlot := int(ss.Last - mb.cache.fseq) + if endSlot < 0 { + endSlot = 0 + } + if endSlot >= len(mb.cache.idx) || startSlot > endSlot { + return } + var le = binary.LittleEndian - for slot := startSlot; slot < len(mb.cache.idx); slot++ { - bi := mb.cache.idx[slot] &^ hbit - if bi == dbit { - // delete marker so skip. - continue + if ss.firstNeedsUpdate { + // Mark first as updated. + ss.firstNeedsUpdate = false + + fseq := ss.First + 1 + if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { + fseq = mbFseq + } + for slot := startSlot; slot < len(mb.cache.idx); slot++ { + bi := mb.cache.idx[slot] &^ hbit + if bi == dbit { + // delete marker so skip. + continue + } + li := int(bi) - mb.cache.off + if li >= len(mb.cache.buf) { + ss.First = ss.Last + return + } + buf := mb.cache.buf[li:] + hdr := buf[:msgHdrSize] + slen := int(le.Uint16(hdr[20:])) + if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { + seq := le.Uint64(hdr[4:]) + if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { + continue + } + ss.First = seq + if ss.Msgs == 1 { + ss.Last = seq + ss.lastNeedsUpdate = false + return + } + // Skip the start slot ahead, if we need to recalculate last we can stop early. + startSlot = slot + break + } } - li := int(bi) - mb.cache.off - if li >= len(mb.cache.buf) { - ss.First = ss.Last - return + } + if ss.lastNeedsUpdate { + // Mark last as updated. + ss.lastNeedsUpdate = false + + lseq := ss.Last - 1 + if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq { + lseq = mbLseq } - buf := mb.cache.buf[li:] - hdr := buf[:msgHdrSize] - slen := int(le.Uint16(hdr[20:])) - if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { - seq := le.Uint64(hdr[4:]) - if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { + for slot := endSlot; slot >= startSlot; slot-- { + bi := mb.cache.idx[slot] &^ hbit + if bi == dbit { + // delete marker so skip. continue } - ss.First = seq - if ss.Msgs == 1 { + li := int(bi) - mb.cache.off + if li >= len(mb.cache.buf) { + // Can't overwrite ss.Last, just skip. + return + } + buf := mb.cache.buf[li:] + hdr := buf[:msgHdrSize] + slen := int(le.Uint16(hdr[20:])) + if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { + seq := le.Uint64(hdr[4:]) + if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) { + continue + } + // Sequence should never be lower, but guard against it nonetheless. + if seq < ss.First { + seq = ss.First + } ss.Last = seq + if ss.Msgs == 1 { + ss.First = seq + ss.firstNeedsUpdate = false + } + return } - return } } } diff --git a/server/filestore_test.go b/server/filestore_test.go index 1be968f8b11..458cef7a740 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5030,7 +5030,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { mb.clearCacheAndOffset() // Now call with start sequence of 1, the old one // This will panic without the fix. - mb.recalculateFirstForSubj("foo", 1, ss) + mb.recalculateForSubj("foo", ss) // Make sure it was update properly. require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false}) } diff --git a/server/memstore.go b/server/memstore.go index 097a6493ee4..7df364c950c 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -140,8 +140,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int return ErrMaxBytes } // If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room. - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subj, ss) } sm, ok := ms.msgs[ss.First] if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) { @@ -427,8 +427,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje var totalSkipped uint64 // We will track start and end sequences as we go. ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) { - if fss.firstNeedsUpdate { - ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) + if fss.firstNeedsUpdate || fss.lastNeedsUpdate { + ms.recalculateForSubj(bytesToString(subj), fss) } if sseq <= fss.First { update(fss) @@ -582,8 +582,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState { fss := make(map[string]SimpleState) ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) { subjs := string(subj) - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subjs, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subjs, ss) } oss := fss[subjs] if oss.First == 0 { // New @@ -672,8 +672,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo var totalSkipped uint64 // We will track start and end sequences as we go. IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) { - if fss.firstNeedsUpdate { - ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) + if fss.firstNeedsUpdate || fss.lastNeedsUpdate { + ms.recalculateForSubj(bytesToString(subj), fss) } if sseq <= fss.First { update(fss) @@ -790,8 +790,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) { return } for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs { - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subj, ss) } if !ms.removeMsg(ss.First, false) { break @@ -1264,8 +1264,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store if !ok { continue } - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subj, ss) } if ss.First < fseq { fseq = ss.First @@ -1359,25 +1359,47 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { } ss.Msgs-- - // We can lazily calculate the first sequence when needed. + // We can lazily calculate the first/last sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate } -// Will recalculate the first sequence for this subject in this block. +// Will recalculate the first and/or last sequence for this subject. // Lock should be held. -func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) { - tseq := startSeq + 1 - if tseq < ms.state.FirstSeq { - tseq = ms.state.FirstSeq - } - for ; tseq <= ss.Last; tseq++ { - if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { - ss.First = tseq - if ss.Msgs == 1 { +func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) { + if ss.firstNeedsUpdate { + tseq := ss.First + 1 + if tseq < ms.state.FirstSeq { + tseq = ms.state.FirstSeq + } + for ; tseq <= ss.Last; tseq++ { + if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { + ss.First = tseq + ss.firstNeedsUpdate = false + if ss.Msgs == 1 { + ss.Last = tseq + ss.lastNeedsUpdate = false + return + } + break + } + } + } + if ss.lastNeedsUpdate { + tseq := ss.Last - 1 + if tseq > ms.state.LastSeq { + tseq = ms.state.LastSeq + } + for ; tseq >= ss.First; tseq-- { + if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { ss.Last = tseq + ss.lastNeedsUpdate = false + if ss.Msgs == 1 { + ss.First = tseq + ss.firstNeedsUpdate = false + } + return } - ss.firstNeedsUpdate = false - return } } } diff --git a/server/store.go b/server/store.go index 72e039816e9..1c8f7f7ec1f 100644 --- a/server/store.go +++ b/server/store.go @@ -166,6 +166,8 @@ type SimpleState struct { // Internal usage for when the first needs to be updated before use. firstNeedsUpdate bool + // Internal usage for when the last needs to be updated before use. + lastNeedsUpdate bool } // LostStreamData indicates msgs that have been lost. diff --git a/server/store_test.go b/server/store_test.go index d2c3481a1a5..168b488d62c 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -152,6 +152,19 @@ func TestStoreSubjectStateConsistency(t *testing.T) { ss := fs.SubjectsState("foo") return ss["foo"] } + var smp StoreMsg + expectFirstSeq := func(eseq uint64) { + t.Helper() + sm, _, err := fs.LoadNextMsg("foo", false, 0, &smp) + require_NoError(t, err) + require_Equal(t, sm.seq, eseq) + } + expectLastSeq := func(eseq uint64) { + t.Helper() + sm, err := fs.LoadLastMsg("foo", &smp) + require_NoError(t, err) + require_Equal(t, sm.seq, eseq) + } // Publish an initial batch of messages. for i := 0; i < 4; i++ { @@ -163,7 +176,9 @@ func TestStoreSubjectStateConsistency(t *testing.T) { ss := getSubjectState() require_Equal(t, ss.Msgs, 4) require_Equal(t, ss.First, 1) + expectFirstSeq(1) require_Equal(t, ss.Last, 4) + expectLastSeq(4) // Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate. removed, err := fs.RemoveMsg(1) @@ -174,29 +189,35 @@ func TestStoreSubjectStateConsistency(t *testing.T) { ss = getSubjectState() require_Equal(t, ss.Msgs, 3) require_Equal(t, ss.First, 2) + expectFirstSeq(2) require_Equal(t, ss.Last, 4) + expectLastSeq(4) - // Remove last message. + // Remove last message, ss.Last is lazy so will only mark ss.lastNeedsUpdate. removed, err = fs.RemoveMsg(4) require_NoError(t, err) require_True(t, removed) - // ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases. + // Will update last, so corrects to 3. ss = getSubjectState() require_Equal(t, ss.Msgs, 2) require_Equal(t, ss.First, 2) - require_Equal(t, ss.Last, 4) + expectFirstSeq(2) + require_Equal(t, ss.Last, 3) + expectLastSeq(3) // Remove first message again. removed, err = fs.RemoveMsg(2) require_NoError(t, err) require_True(t, removed) - // Since we only have one message left, must update ss.First and set ss.Last to equal. + // Since we only have one message left, must update ss.First and ensure ss.Last equals. ss = getSubjectState() require_Equal(t, ss.Msgs, 1) require_Equal(t, ss.First, 3) + expectFirstSeq(3) require_Equal(t, ss.Last, 3) + expectLastSeq(3) // Publish some more messages so we can test another scenario. for i := 0; i < 3; i++ { @@ -208,7 +229,9 @@ func TestStoreSubjectStateConsistency(t *testing.T) { ss = getSubjectState() require_Equal(t, ss.Msgs, 4) require_Equal(t, ss.First, 3) + expectFirstSeq(3) require_Equal(t, ss.Last, 7) + expectLastSeq(7) // Remove last sequence, ss.Last is lazy so doesn't get updated. removed, err = fs.RemoveMsg(7) @@ -220,18 +243,18 @@ func TestStoreSubjectStateConsistency(t *testing.T) { require_NoError(t, err) require_True(t, removed) - // Remove (now) first sequence, but because ss.First is lazy we first need to recalculate - // to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First - // yet again, since ss.Last is lazy and is not correct. + // Remove (now) first sequence. Both ss.First and ss.Last are lazy and both need to be recalculated later. removed, err = fs.RemoveMsg(5) require_NoError(t, err) require_True(t, removed) - // ss.First should equal ss.Last, last should have been updated now. + // ss.First and ss.Last should both be recalculated and equal each other. ss = getSubjectState() require_Equal(t, ss.Msgs, 1) require_Equal(t, ss.First, 6) + expectFirstSeq(6) require_Equal(t, ss.Last, 6) + expectLastSeq(6) }, ) } From dd8fba8515340d5f09f658173240315a169f3e21 Mon Sep 17 00:00:00 2001 From: Jack Date: Thu, 31 Oct 2024 13:41:24 -0400 Subject: [PATCH 06/15] modified reply map pruning logic to prune every x messages or after time duration --- server/client.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/server/client.go b/server/client.go index d87d122110c..3325361bee0 100644 --- a/server/client.go +++ b/server/client.go @@ -261,6 +261,9 @@ type client struct { last time.Time lastIn time.Time + repliesSincePrune uint16 + lastReplyPrune time.Time + headers bool rtt time.Duration @@ -420,6 +423,7 @@ const ( pruneSize = 32 routeTargetInit = 8 replyPermLimit = 4096 + replyPruneTime = time.Second ) // Represent read cache booleans with a bitmask @@ -3528,7 +3532,8 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su // do that accounting here. We only look at client.replies which will be non-nil. if client.replies != nil && len(reply) > 0 { client.replies[string(reply)] = &resp{time.Now(), 0} - if len(client.replies) > replyPermLimit { + client.repliesSincePrune++ + if client.repliesSincePrune > replyPermLimit || time.Since(client.lastReplyPrune) > replyPruneTime { client.pruneReplyPerms() } } @@ -3652,6 +3657,9 @@ func (c *client) pruneReplyPerms() { delete(c.replies, k) } } + + c.repliesSincePrune = 0 + c.lastReplyPrune = time.Now() } // pruneDenyCache will prune the deny cache via randomly From 1905d6511235ee142d0918f80211af5d959feb40 Mon Sep 17 00:00:00 2001 From: Jack Date: Fri, 1 Nov 2024 12:04:25 -0400 Subject: [PATCH 07/15] removed unnecessary time.Now() call --- server/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/client.go b/server/client.go index 3325361bee0..4cab0c9c3ef 100644 --- a/server/client.go +++ b/server/client.go @@ -3659,7 +3659,7 @@ func (c *client) pruneReplyPerms() { } c.repliesSincePrune = 0 - c.lastReplyPrune = time.Now() + c.lastReplyPrune = now } // pruneDenyCache will prune the deny cache via randomly From 5fb73e9291b362e77e75a5a174104c6ef2b98e8d Mon Sep 17 00:00:00 2001 From: Jack Date: Fri, 1 Nov 2024 14:45:08 -0400 Subject: [PATCH 08/15] only track reply subject perms if the client is not already allowed to publish --- server/client.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/client.go b/server/client.go index 4cab0c9c3ef..3c800aa12db 100644 --- a/server/client.go +++ b/server/client.go @@ -3530,7 +3530,8 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su // If we are tracking dynamic publish permissions that track reply subjects, // do that accounting here. We only look at client.replies which will be non-nil. - if client.replies != nil && len(reply) > 0 { + // Only reply subject permissions if the client is not already allowed to publish to the reply subject. + if client.replies != nil && len(reply) > 0 && !client.pubAllowedFullCheck(string(reply), true, true) { client.replies[string(reply)] = &resp{time.Now(), 0} client.repliesSincePrune++ if client.repliesSincePrune > replyPermLimit || time.Since(client.lastReplyPrune) > replyPruneTime { @@ -3728,7 +3729,7 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo allowed = np == 0 } - // If we are currently not allowed but we are tracking reply subjects + // If we are tracking reply subjects // dynamically, check to see if we are allowed here but avoid pcache. // We need to acquire the lock though. if !allowed && fullCheck && c.perms.resp != nil { From a0646be62daa60310a19de78f7bc6834de9b6bde Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 11 Dec 2024 16:30:12 +0100 Subject: [PATCH 09/15] [FIXED] MaxMsgsPerSubject limit not applied when updating from no value Signed-off-by: Maurice van Veen --- server/filestore.go | 5 ++++- server/memstore.go | 7 +++++-- server/store_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 689e7e08efb..de0dbda7e9c 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -580,6 +580,9 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { if cfg.Storage != FileStorage { return fmt.Errorf("fileStore requires file storage type in config") } + if cfg.MaxMsgsPer < -1 { + cfg.MaxMsgsPer = -1 + } fs.mu.Lock() new_cfg := FileStreamInfo{Created: fs.cfg.Created, StreamConfig: *cfg} @@ -610,7 +613,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { fs.ageChk = nil } - if fs.cfg.MaxMsgsPer > 0 && fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer { + if fs.cfg.MaxMsgsPer > 0 && (old_cfg.MaxMsgsPer == 0 || fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer) { fs.enforceMsgPerSubjectLimit(true) } fs.mu.Unlock() diff --git a/server/memstore.go b/server/memstore.go index 7df364c950c..350cfa388e9 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -84,10 +84,13 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { ms.ageChk = nil } // Make sure to update MaxMsgsPer + if cfg.MaxMsgsPer < -1 { + cfg.MaxMsgsPer = -1 + } maxp := ms.maxp ms.maxp = cfg.MaxMsgsPer - // If the value is smaller we need to enforce that. - if ms.maxp != 0 && ms.maxp < maxp { + // If the value is smaller, or was unset before, we need to enforce that. + if ms.maxp > 0 && (maxp == 0 || ms.maxp < maxp) { lm := uint64(ms.maxp) ms.fss.Iter(func(subj []byte, ss *SimpleState) bool { if ss.Msgs > lm { diff --git a/server/store_test.go b/server/store_test.go index 168b488d62c..a916ceedb89 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -258,3 +258,47 @@ func TestStoreSubjectStateConsistency(t *testing.T) { }, ) } + +func TestStoreMaxMsgsPerUpdateBug(t *testing.T) { + config := func() StreamConfig { + return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0} + } + testAllStoreAllPermutations( + t, false, config(), + func(t *testing.T, fs StreamStore) { + for i := 0; i < 5; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + ss := fs.State() + require_Equal(t, ss.Msgs, 5) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 5) + + // Update max messages per-subject from 0 (infinite) to 1. + // Since the per-subject limit was not specified before, messages should be removed upon config update. + cfg := config() + if _, ok := fs.(*fileStore); ok { + cfg.Storage = FileStorage + } else { + cfg.Storage = MemoryStorage + } + cfg.MaxMsgsPer = 1 + err := fs.UpdateConfig(&cfg) + require_NoError(t, err) + + // Only one message should remain. + ss = fs.State() + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.FirstSeq, 5) + require_Equal(t, ss.LastSeq, 5) + + // Update max messages per-subject from 0 (infinite) to an invalid value (< -1). + cfg.MaxMsgsPer = -2 + err = fs.UpdateConfig(&cfg) + require_NoError(t, err) + require_Equal(t, cfg.MaxMsgsPer, -1) + }, + ) +} From 917b60f0ecc7d9ed857de1ac2d897c2ae56256fc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 12 Dec 2024 00:45:08 +0000 Subject: [PATCH 10/15] Bump golang.org/x/crypto from 0.30.0 to 0.31.0 Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.30.0 to 0.31.0. - [Commits](https://github.com/golang/crypto/compare/v0.30.0...v0.31.0) --- updated-dependencies: - dependency-name: golang.org/x/crypto dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a0c6fe81cb2..1ee0d5d86fd 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/nats-io/nkeys v0.4.8 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 - golang.org/x/crypto v0.30.0 + golang.org/x/crypto v0.31.0 golang.org/x/sys v0.28.0 golang.org/x/time v0.8.0 ) diff --git a/go.sum b/go.sum index cf229b1e0a7..51b9fae1322 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= -golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= -golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= From da8d7ef4bbae6c0975f4e5e56825878206796277 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 12 Dec 2024 10:22:54 +0100 Subject: [PATCH 11/15] NRG: Don't mark current/healthy while catching up Signed-off-by: Maurice van Veen --- server/raft.go | 7 +-- server/raft_test.go | 141 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 5 deletions(-) diff --git a/server/raft.go b/server/raft.go index 563af0d11dc..a6dff47e9ff 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1311,11 +1311,6 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { return true } - // Check here on catchup status. - if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex { - n.cancelCatchup() - } - // Check to see that we have heard from the current leader lately. if n.leader != noLeader && n.leader != n.id && n.catchup == nil { okInterval := int64(hbInterval) * 2 @@ -1326,7 +1321,9 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { } } if cs := n.catchup; cs != nil { + // We're actively catching up, can't mark current even if commit==applied. n.debug("Not current, still catching up pindex=%d, cindex=%d", n.pindex, cs.cindex) + return false } if n.commit == n.applied { diff --git a/server/raft_test.go b/server/raft_test.go index 29d50e57db9..0f30dec288e 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1699,3 +1699,144 @@ func TestNRGMemoryWALEmptiesSnapshotsDir(t *testing.T) { require_NoError(t, err) require_Len(t, len(files), 0) } + +func TestNRGHealthCheckWaitForCatchup(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) + + // Switch follower into catchup. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + require_Equal(t, n.catchup.cterm, aeHeartbeat.term) + require_Equal(t, n.catchup.cindex, aeHeartbeat.pindex) + + // Catchup first message. + n.processAppendEntry(aeMsg1, n.catchup.sub) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // Catchup second message. + n.processAppendEntry(aeMsg2, n.catchup.sub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.commit, 1) + require_False(t, n.Healthy()) + + // If we apply the entry sooner than we receive the next catchup message, + // should not mark as healthy since we're still in catchup. + n.Applied(1) + require_False(t, n.Healthy()) + + // Catchup third message. + n.processAppendEntry(aeMsg3, n.catchup.sub) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 2) + n.Applied(2) + require_False(t, n.Healthy()) + + // Heartbeat stops catchup. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_True(t, n.catchup == nil) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 3) + require_False(t, n.Healthy()) + + // Still need to wait for the last entry to be applied. + n.Applied(3) + require_True(t, n.Healthy()) +} + +func TestNRGHealthCheckWaitForDoubleCatchup(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) + + // Switch follower into catchup. + n.processAppendEntry(aeHeartbeat1, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + require_Equal(t, n.catchup.cterm, aeHeartbeat1.term) + require_Equal(t, n.catchup.cindex, aeHeartbeat1.pindex) + + // Catchup first message. + n.processAppendEntry(aeMsg1, n.catchup.sub) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // We miss this message, since we're catching up. + n.processAppendEntry(aeMsg3, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // We also miss the heartbeat, since we're catching up. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // Catchup second message, this will stop catchup. + n.processAppendEntry(aeMsg2, n.catchup.sub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.commit, 1) + n.Applied(1) + require_False(t, n.Healthy()) + + // We expect to still be in catchup, waiting for a heartbeat or new append entry to reset. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.cterm, aeHeartbeat1.term) + require_Equal(t, n.catchup.cindex, aeHeartbeat1.pindex) + + // We now get a 'future' heartbeat, should restart catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 2) // n.pindex + require_Equal(t, n.catchup.cterm, aeHeartbeat2.term) + require_Equal(t, n.catchup.cindex, aeHeartbeat2.pindex) + require_False(t, n.Healthy()) + + // Catchup third message. + n.processAppendEntry(aeMsg3, n.catchup.sub) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 2) + n.Applied(2) + require_False(t, n.Healthy()) + + // Heartbeat stops catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_True(t, n.catchup == nil) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 3) + require_False(t, n.Healthy()) + + // Still need to wait for the last entry to be applied. + n.Applied(3) + require_True(t, n.Healthy()) +} From e22c15a3de55b363af7c551418d36505b6b7415b Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 12 Dec 2024 15:50:49 +0100 Subject: [PATCH 12/15] [FIXED] Healthz with details returns 200 OK even with errors Signed-off-by: Maurice van Veen --- server/monitor.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index 2bd25f9a7be..77a6c1fe71a 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3228,10 +3228,11 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) { Details: includeDetails, }) - code := http.StatusOK + code := hs.StatusCode if hs.Error != _EMPTY_ { s.Warnf("Healthcheck failed: %q", hs.Error) - code = hs.StatusCode + } else if len(hs.Errors) != 0 { + s.Warnf("Healthcheck failed: %d errors", len(hs.Errors)) } // Remove StatusCode from JSON representation when responding via HTTP // since this is already in the response. From c350c62e50a831dc0c3a53541837c527a15d65e0 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 13 Dec 2024 09:16:53 +0100 Subject: [PATCH 13/15] Add test for sending consumer info right after consumer create, with paused applies Signed-off-by: Maurice van Veen --- server/jetstream_cluster_1_test.go | 42 ++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 79dcc61be8d..fcc5c243c71 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6812,6 +6812,48 @@ func TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes(t *testing.T) { } } +func TestJetStreamClusterConsumerInfoAfterCreate(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nl := c.randomNonLeader() + nc, js := jsClientConnect(t, nl) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // We pause applies for the server we're connected to. + // This is fine for the RAFT log and allowing the consumer to be created, + // but we will not be able to apply the consumer assignment for some time. + mjs := nl.getJetStream() + require_NotNil(t, js) + mg := mjs.getMetaGroup() + require_NotNil(t, mg) + err = mg.(*raft).PauseApply() + require_NoError(t, err) + + // Add consumer. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + // Consumer info should not fail, this server should not short-circuit because + // it was not able to apply the consumer assignment. + _, err = js.ConsumerInfo("TEST", "CONSUMER") + require_NoError(t, err) + + // Resume applies. + mg.(*raft).ResumeApply() + + // Check consumer info still works. + _, err = js.ConsumerInfo("TEST", "CONSUMER") + require_NoError(t, err) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. From dbb3907ba54c9ad1f506fb516af134ae0b14b72d Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 13 Dec 2024 09:18:16 +0100 Subject: [PATCH 14/15] Revert "[IMPROVED] Bypass meta-layer's API queue for non-existent consumers. (#6176)" This reverts commit eba1953a08b783b55770584e3b8a287da5c5fbfd, reversing changes made to fa50c751ab3cff88f69d760cc76acfd2edceb48d. --- server/client.go | 20 ++-------- server/jetstream.go | 4 -- server/jetstream_api.go | 63 +----------------------------- server/jetstream_cluster.go | 15 +++---- server/jetstream_cluster_1_test.go | 15 +++---- server/norace_test.go | 57 --------------------------- server/server.go | 14 +++---- 7 files changed, 23 insertions(+), 165 deletions(-) diff --git a/server/client.go b/server/client.go index 3c800aa12db..d7a3ccb1252 100644 --- a/server/client.go +++ b/server/client.go @@ -4169,9 +4169,8 @@ func getHeader(key string, hdr []byte) []byte { // For bytes.HasPrefix below. var ( - jsRequestNextPreB = []byte(jsRequestNextPre) - jsDirectGetPreB = []byte(jsDirectGetPre) - jsConsumerInfoPreB = []byte(JSApiConsumerInfoPre) + jsRequestNextPreB = []byte(jsRequestNextPre) + jsDirectGetPreB = []byte(jsDirectGetPre) ) // processServiceImport is an internal callback when a subscription matches an imported service @@ -4191,16 +4190,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt } } - var checkJS, checkConsumerInfo bool - acc.mu.RLock() + var checkJS bool shouldReturn := si.invalid || acc.sl == nil if !shouldReturn && !isResponse && si.to == jsAllAPI { if bytes.HasPrefix(c.pa.subject, jsDirectGetPreB) || bytes.HasPrefix(c.pa.subject, jsRequestNextPreB) { checkJS = true - } else if len(c.pa.psi) == 0 && bytes.HasPrefix(c.pa.subject, jsConsumerInfoPreB) { - // Only check if we are clustered and expecting a reply. - checkConsumerInfo = len(c.pa.reply) > 0 && c.srv.JetStreamIsClustered() } } siAcc := si.acc @@ -4214,15 +4209,6 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt return } - // Here we will do a fast check for consumer info only to check if it does not exists. This will spread the - // load to all servers with connected clients since service imports are processed at point of entry. - // Only call for clustered setups. - if checkConsumerInfo && si.se != nil && si.se.acc == c.srv.SystemAccount() { - if c.srv.jsConsumerProcessMissing(c, acc) { - return - } - } - var nrr []byte var rsi *serviceImport diff --git a/server/jetstream.go b/server/jetstream.go index 02920e76a4b..e3f073fa95f 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -461,8 +461,6 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { if err := s.enableJetStreamClustering(); err != nil { return err } - // Set our atomic bool to clustered. - s.jsClustered.Store(true) } // Mark when we are up and running. @@ -967,8 +965,6 @@ func (s *Server) shutdownJetStream() { cc.c = nil } cc.meta = nil - // Set our atomic bool to false. - s.jsClustered.Store(false) } js.mu.Unlock() diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 88c06730b6a..de014e74b72 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -158,9 +158,8 @@ const ( // JSApiConsumerInfo is for obtaining general information about a consumer. // Will return JSON response. - JSApiConsumerInfoPre = "$JS.API.CONSUMER.INFO." - JSApiConsumerInfo = "$JS.API.CONSUMER.INFO.*.*" - JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s" + JSApiConsumerInfo = "$JS.API.CONSUMER.INFO.*.*" + JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s" // JSApiConsumerDelete is the endpoint to delete consumers. // Will return JSON response. @@ -973,15 +972,6 @@ func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response) } -// Use the account acc to send actual result from non-system account. -func (s *Server) sendAPIErrResponseFromAccount(ci *ClientInfo, acc *Account, subject, reply, request, response string) { - acc.trackAPIErr() - if reply != _EMPTY_ { - s.sendInternalAccountMsg(acc, reply, response) - } - s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response) -} - const errRespDelay = 500 * time.Millisecond func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup) { @@ -4243,55 +4233,6 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } -// This will be a quick check on point of entry for a consumer that does -// not exist. If that is the case we will return the response and return -// true which will shortcut the service import to alleviate pressure on -// the JS API queues. -func (s *Server) jsConsumerProcessMissing(c *client, acc *Account) bool { - subject := bytesToString(c.pa.subject) - streamName, consumerName := streamNameFromSubject(subject), consumerNameFromSubject(subject) - - // Check to make sure the consumer is assigned. - // All JS servers will have the meta information. - js, cc := s.getJetStreamCluster() - if js == nil || cc == nil { - return false - } - js.mu.RLock() - sa, ca := js.assignments(acc.Name, streamName, consumerName) - js.mu.RUnlock() - - // If we have a consumer assignment return false here and let normally processing takeover. - if ca != nil { - return false - } - - // We can't find the consumer, so mimic what would be the errors below. - var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}} - - // Need to make subject and reply real here for queued response processing. - subject = string(c.pa.subject) - reply := string(c.pa.reply) - - ci := c.getClientInfo(true) - - if hasJS, doErr := acc.checkJetStream(); !hasJS { - if doErr { - resp.Error = NewJSNotEnabledForAccountError() - s.sendAPIErrResponseFromAccount(ci, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) - } - } else if sa == nil { - resp.Error = NewJSStreamNotFoundError() - s.sendAPIErrResponseFromAccount(ci, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) - } else { - // If we are here the consumer is not present. - resp.Error = NewJSConsumerNotFoundError() - s.sendAPIErrResponseFromAccount(ci, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) - } - - return true -} - // Request for information about an consumer. func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ebcd29c8abb..66b41f2499a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -224,7 +224,11 @@ func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { } func (s *Server) JetStreamIsClustered() bool { - return s.jsClustered.Load() + js := s.getJetStream() + if js == nil { + return false + } + return js.isClustered() } func (s *Server) JetStreamIsLeader() bool { @@ -4740,15 +4744,6 @@ func (js *jetStream) consumerAssignment(account, stream, consumer string) *consu return nil } -// Return both the stream and consumer assignments. -// Lock should be held. -func (js *jetStream) assignments(account, stream, consumer string) (*streamAssignment, *consumerAssignment) { - if sa := js.streamAssignment(account, stream); sa != nil { - return sa, sa.consumers[consumer] - } - return nil, nil -} - // consumerAssigned informs us if this server has this consumer assigned. func (jsa *jsAccount) consumerAssigned(stream, consumer string) bool { jsa.mu.RLock() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index fcc5c243c71..2ac041b283f 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -3412,6 +3412,7 @@ func TestJetStreamClusterExtendedAccountInfo(t *testing.T) { // Go client will lag so use direct for now. getAccountInfo := func() *nats.AccountInfo { t.Helper() + info, err := js.AccountInfo() if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -3436,13 +3437,10 @@ func TestJetStreamClusterExtendedAccountInfo(t *testing.T) { js.ConsumerInfo("TEST-2", "NO-CONSUMER") js.ConsumerInfo("TEST-3", "NO-CONSUMER") - checkFor(t, 2*time.Second, 250*time.Millisecond, func() error { - ai = getAccountInfo() - if ai.API.Errors != 4 { - return fmt.Errorf("Expected 4 API calls to be errors, got %d", ai.API.Errors) - } - return nil - }) + ai = getAccountInfo() + if ai.API.Errors != 4 { + t.Fatalf("Expected 4 API calls to be errors, got %d", ai.API.Errors) + } } func TestJetStreamClusterPeerRemovalAPI(t *testing.T) { @@ -4321,8 +4319,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { if err := js.DeleteConsumer("NO-Q", "dlc"); !notAvailableErr(err) { t.Fatalf("Expected an 'unavailable' error, got %v", err) } - // Since we did not create the consumer our bypass will respond from the local server. - if _, err := js.ConsumerInfo("NO-Q", "dlc"); err != nats.ErrConsumerNotFound { + if _, err := js.ConsumerInfo("NO-Q", "dlc"); !notAvailableErr(err) { t.Fatalf("Expected an 'unavailable' error, got %v", err) } // Listers diff --git a/server/norace_test.go b/server/norace_test.go index 3a67cd9b802..5d54dfb1df1 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -11257,60 +11257,3 @@ func TestNoRaceJetStreamClusterLargeMetaSnapshotTiming(t *testing.T) { require_NoError(t, n.InstallSnapshot(snap)) t.Logf("Took %v to snap meta with size of %v\n", time.Since(start), friendlyBytes(len(snap))) } - -func TestNoRaceJetStreamClusterInfoOnMissingConsumers(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3F", 3) - defer c.shutdown() - - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - // Create a stream just so the consumer info processing misses on the consumer only. - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) - - done := make(chan bool) - pending := make(chan int, 1) - - // Check to make sure we never have any pending on the API queue. - go func() { - ml := c.leader() - for { - select { - case <-done: - return - case <-time.After(100 * time.Millisecond): - qlen := ml.jsAPIRoutedReqs.len() + int(ml.jsAPIRoutedReqs.inProgress()) - if qlen > 0 { - pending <- qlen - return - } - } - } - }() - - wg := sync.WaitGroup{} - wg.Add(500) - for i := 0; i < 500; i++ { - go func() { - defer wg.Done() - s := c.randomServer() - nc, js := jsClientConnect(t, s) - defer nc.Close() - // Check for non-existent consumers. - for c := 0; c < 1000; c++ { - _, err := js.ConsumerInfo("TEST", fmt.Sprintf("C-%d", c)) - require_Error(t, err) - } - }() - } - wg.Wait() - close(done) - if len(pending) > 0 { - t.Fatalf("Saw API pending of %d, expected always 0", <-pending) - } -} diff --git a/server/server.go b/server/server.go index 81013d1e1b9..e6f6c728d42 100644 --- a/server/server.go +++ b/server/server.go @@ -141,10 +141,8 @@ type Server struct { listenerErr error gacc *Account sys *internal - sysAcc atomic.Pointer[Account] js atomic.Pointer[jetStream] isMetaLeader atomic.Bool - jsClustered atomic.Bool accounts sync.Map tmpAccounts sync.Map // Temporarily stores accounts that are being built activeAccounts int32 @@ -1283,7 +1281,6 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) if err == nil && s.sys != nil && acc != s.sys.account { // sys.account.clients (including internal client)/respmap/etc... are transferred separately s.sys.account = acc - s.sysAcc.Store(acc) } if err != nil { return awcsti, fmt.Errorf("error resolving system account: %v", err) @@ -1639,7 +1636,13 @@ func (s *Server) SetSystemAccount(accName string) error { // SystemAccount returns the system account if set. func (s *Server) SystemAccount() *Account { - return s.sysAcc.Load() + var sacc *Account + s.mu.RLock() + if s.sys != nil { + sacc = s.sys.account + } + s.mu.RUnlock() + return sacc } // GlobalAccount returns the global account. @@ -1711,9 +1714,6 @@ func (s *Server) setSystemAccount(acc *Account) error { s.sys.wg.Add(1) s.mu.Unlock() - // Store in atomic for fast lookup. - s.sysAcc.Store(acc) - // Register with the account. s.sys.client.registerWithAccount(acc) From aca44bfa4da09e2272beed3c31b254857ee84634 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 13 Dec 2024 09:36:57 +0100 Subject: [PATCH 15/15] Re-introduce atomics from #6176 Signed-off-by: Maurice van Veen --- server/jetstream.go | 4 ++++ server/jetstream_cluster.go | 6 +----- server/server.go | 14 +++++++------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index e3f073fa95f..02920e76a4b 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -461,6 +461,8 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { if err := s.enableJetStreamClustering(); err != nil { return err } + // Set our atomic bool to clustered. + s.jsClustered.Store(true) } // Mark when we are up and running. @@ -965,6 +967,8 @@ func (s *Server) shutdownJetStream() { cc.c = nil } cc.meta = nil + // Set our atomic bool to false. + s.jsClustered.Store(false) } js.mu.Unlock() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 66b41f2499a..8f08b1e502a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -224,11 +224,7 @@ func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { } func (s *Server) JetStreamIsClustered() bool { - js := s.getJetStream() - if js == nil { - return false - } - return js.isClustered() + return s.jsClustered.Load() } func (s *Server) JetStreamIsLeader() bool { diff --git a/server/server.go b/server/server.go index e6f6c728d42..81013d1e1b9 100644 --- a/server/server.go +++ b/server/server.go @@ -141,8 +141,10 @@ type Server struct { listenerErr error gacc *Account sys *internal + sysAcc atomic.Pointer[Account] js atomic.Pointer[jetStream] isMetaLeader atomic.Bool + jsClustered atomic.Bool accounts sync.Map tmpAccounts sync.Map // Temporarily stores accounts that are being built activeAccounts int32 @@ -1281,6 +1283,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) if err == nil && s.sys != nil && acc != s.sys.account { // sys.account.clients (including internal client)/respmap/etc... are transferred separately s.sys.account = acc + s.sysAcc.Store(acc) } if err != nil { return awcsti, fmt.Errorf("error resolving system account: %v", err) @@ -1636,13 +1639,7 @@ func (s *Server) SetSystemAccount(accName string) error { // SystemAccount returns the system account if set. func (s *Server) SystemAccount() *Account { - var sacc *Account - s.mu.RLock() - if s.sys != nil { - sacc = s.sys.account - } - s.mu.RUnlock() - return sacc + return s.sysAcc.Load() } // GlobalAccount returns the global account. @@ -1714,6 +1711,9 @@ func (s *Server) setSystemAccount(acc *Account) error { s.sys.wg.Add(1) s.mu.Unlock() + // Store in atomic for fast lookup. + s.sysAcc.Store(acc) + // Register with the account. s.sys.client.registerWithAccount(acc)