diff --git a/go.mod b/go.mod index a0c6fe81cb2..1ee0d5d86fd 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/nats-io/nkeys v0.4.8 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 - golang.org/x/crypto v0.30.0 + golang.org/x/crypto v0.31.0 golang.org/x/sys v0.28.0 golang.org/x/time v0.8.0 ) diff --git a/go.sum b/go.sum index cf229b1e0a7..51b9fae1322 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= -golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= -golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/server/client.go b/server/client.go index d87d122110c..d7a3ccb1252 100644 --- a/server/client.go +++ b/server/client.go @@ -261,6 +261,9 @@ type client struct { last time.Time lastIn time.Time + repliesSincePrune uint16 + lastReplyPrune time.Time + headers bool rtt time.Duration @@ -420,6 +423,7 @@ const ( pruneSize = 32 routeTargetInit = 8 replyPermLimit = 4096 + replyPruneTime = time.Second ) // Represent read cache booleans with a bitmask @@ -3526,9 +3530,11 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su // If we are tracking dynamic publish permissions that track reply subjects, // do that accounting here. We only look at client.replies which will be non-nil. - if client.replies != nil && len(reply) > 0 { + // Only reply subject permissions if the client is not already allowed to publish to the reply subject. + if client.replies != nil && len(reply) > 0 && !client.pubAllowedFullCheck(string(reply), true, true) { client.replies[string(reply)] = &resp{time.Now(), 0} - if len(client.replies) > replyPermLimit { + client.repliesSincePrune++ + if client.repliesSincePrune > replyPermLimit || time.Since(client.lastReplyPrune) > replyPruneTime { client.pruneReplyPerms() } } @@ -3652,6 +3658,9 @@ func (c *client) pruneReplyPerms() { delete(c.replies, k) } } + + c.repliesSincePrune = 0 + c.lastReplyPrune = now } // pruneDenyCache will prune the deny cache via randomly @@ -3720,7 +3729,7 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo allowed = np == 0 } - // If we are currently not allowed but we are tracking reply subjects + // If we are tracking reply subjects // dynamically, check to see if we are allowed here but avoid pcache. // We need to acquire the lock though. if !allowed && fullCheck && c.perms.resp != nil { @@ -4160,9 +4169,8 @@ func getHeader(key string, hdr []byte) []byte { // For bytes.HasPrefix below. var ( - jsRequestNextPreB = []byte(jsRequestNextPre) - jsDirectGetPreB = []byte(jsDirectGetPre) - jsConsumerInfoPreB = []byte(JSApiConsumerInfoPre) + jsRequestNextPreB = []byte(jsRequestNextPre) + jsDirectGetPreB = []byte(jsDirectGetPre) ) // processServiceImport is an internal callback when a subscription matches an imported service @@ -4182,16 +4190,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt } } - var checkJS, checkConsumerInfo bool - acc.mu.RLock() + var checkJS bool shouldReturn := si.invalid || acc.sl == nil if !shouldReturn && !isResponse && si.to == jsAllAPI { if bytes.HasPrefix(c.pa.subject, jsDirectGetPreB) || bytes.HasPrefix(c.pa.subject, jsRequestNextPreB) { checkJS = true - } else if len(c.pa.psi) == 0 && bytes.HasPrefix(c.pa.subject, jsConsumerInfoPreB) { - // Only check if we are clustered and expecting a reply. - checkConsumerInfo = len(c.pa.reply) > 0 && c.srv.JetStreamIsClustered() } } siAcc := si.acc @@ -4205,15 +4209,6 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt return } - // Here we will do a fast check for consumer info only to check if it does not exists. This will spread the - // load to all servers with connected clients since service imports are processed at point of entry. - // Only call for clustered setups. - if checkConsumerInfo && si.se != nil && si.se.acc == c.srv.SystemAccount() { - if c.srv.jsConsumerProcessMissing(c, acc) { - return - } - } - var nrr []byte var rsi *serviceImport diff --git a/server/consumer.go b/server/consumer.go index a91c1925441..438041ec899 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2667,23 +2667,20 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { TimeStamp: time.Now().UTC(), } - // If we are replicated and we are not the leader or we are filtered, we need to pull certain data from our store. - isLeader := o.isLeader() - if rg != nil && rg.node != nil && o.store != nil && (!isLeader || o.isFiltered()) { + // If we are replicated, we need to pull certain data from our store. + if rg != nil && rg.node != nil && o.store != nil { state, err := o.store.BorrowState() if err != nil { o.mu.Unlock() return nil } - if !isLeader { - info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream - info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream + // If we are the leader we could have o.sseq that is skipped ahead. + // To maintain consistency in reporting (e.g. jsz) we always take the state for our delivered/ackfloor stream sequence. + info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream + info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream + if !o.isLeader() { info.NumAckPending = len(state.Pending) info.NumRedelivered = len(state.Redelivered) - } else { - // Since we are filtered and we are the leader we could have o.sseq that is skipped ahead. - // To maintain consistency in reporting (e.g. jsz) we take the state for our delivered stream sequence. - info.Delivered.Stream = state.Delivered.Stream } } diff --git a/server/filestore.go b/server/filestore.go index 8d2bfa07c11..de0dbda7e9c 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -580,6 +580,9 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { if cfg.Storage != FileStorage { return fmt.Errorf("fileStore requires file storage type in config") } + if cfg.MaxMsgsPer < -1 { + cfg.MaxMsgsPer = -1 + } fs.mu.Lock() new_cfg := FileStreamInfo{Created: fs.cfg.Created, StreamConfig: *cfg} @@ -610,7 +613,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { fs.ageChk = nil } - if fs.cfg.MaxMsgsPer > 0 && fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer { + if fs.cfg.MaxMsgsPer > 0 && (old_cfg.MaxMsgsPer == 0 || fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer) { fs.enforceMsgPerSubjectLimit(true) } fs.mu.Unlock() @@ -2312,8 +2315,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor fseq = lseq + 1 for _, subj := range subs { ss, _ := mb.fss.Find(stringToBytes(subj)) - if ss != nil && ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { + mb.recalculateForSubj(subj, ss) } if ss == nil || start > ss.Last || ss.First >= fseq { continue @@ -2442,8 +2445,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( // If we already found a partial then don't do anything else. return } - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(bytesToString(bsubj), ss) } if sseq <= ss.First { update(ss) @@ -2613,10 +2616,6 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si // Always reset. ss.First, ss.Last, ss.Msgs = 0, 0, 0 - if filter == _EMPTY_ { - filter = fwcs - } - // We do need to figure out the first and last sequences. wc := subjectHasWildcard(filter) start, stop := uint32(math.MaxUint32), uint32(0) @@ -2746,8 +2745,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { mb.lsts = time.Now().UnixNano() mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) { subj := string(bsubj) - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } oss := fss[subj] if oss.First == 0 { // New @@ -2937,8 +2936,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) return } subj := bytesToString(bsubj) - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } if sseq <= ss.First { t += ss.Msgs @@ -3225,8 +3224,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo // If we already found a partial then don't do anything else. return } - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } if sseq <= ss.First { t += ss.Msgs @@ -3899,8 +3898,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { info.fblk = i } } - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } mb.mu.Unlock() // Re-acquire fs lock @@ -4031,8 +4030,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { mb.mu.Lock() mb.ensurePerSubjectInfoLoaded() ss, ok := mb.fss.Find(stringToBytes(subj)) - if ok && ss != nil && ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { + mb.recalculateForSubj(subj, ss) } mb.mu.Unlock() if ss == nil { @@ -7833,24 +7832,14 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { ss.Msgs-- - // Only one left. - if ss.Msgs == 1 { - if seq == ss.Last { - ss.Last = ss.First - } else { - ss.First = ss.Last - } - ss.firstNeedsUpdate = false - return - } - - // We can lazily calculate the first sequence when needed. + // We can lazily calculate the first/last sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate } -// Will recalulate the first sequence for this subject in this block. +// Will recalculate the first and/or last sequence for this subject in this block. // Will avoid slower path message lookups and scan the cache directly instead. -func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) { +func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { // Need to make sure messages are loaded. if mb.cacheNotLoaded() { if err := mb.loadMsgsWithLock(); err != nil { @@ -7858,39 +7847,100 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si } } - // Mark first as updated. - ss.firstNeedsUpdate = false - - startSlot := int(startSeq - mb.cache.fseq) + startSlot := int(ss.First - mb.cache.fseq) + if startSlot < 0 { + startSlot = 0 + } if startSlot >= len(mb.cache.idx) { ss.First = ss.Last return - } else if startSlot < 0 { - startSlot = 0 + } + endSlot := int(ss.Last - mb.cache.fseq) + if endSlot < 0 { + endSlot = 0 + } + if endSlot >= len(mb.cache.idx) || startSlot > endSlot { + return } var le = binary.LittleEndian - for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ { - bi := mb.cache.idx[slot] &^ hbit - if bi == dbit { - // delete marker so skip. - continue + if ss.firstNeedsUpdate { + // Mark first as updated. + ss.firstNeedsUpdate = false + + fseq := ss.First + 1 + if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { + fseq = mbFseq } - li := int(bi) - mb.cache.off - if li >= len(mb.cache.buf) { - ss.First = ss.Last - return + for slot := startSlot; slot < len(mb.cache.idx); slot++ { + bi := mb.cache.idx[slot] &^ hbit + if bi == dbit { + // delete marker so skip. + continue + } + li := int(bi) - mb.cache.off + if li >= len(mb.cache.buf) { + ss.First = ss.Last + return + } + buf := mb.cache.buf[li:] + hdr := buf[:msgHdrSize] + slen := int(le.Uint16(hdr[20:])) + if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { + seq := le.Uint64(hdr[4:]) + if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { + continue + } + ss.First = seq + if ss.Msgs == 1 { + ss.Last = seq + ss.lastNeedsUpdate = false + return + } + // Skip the start slot ahead, if we need to recalculate last we can stop early. + startSlot = slot + break + } } - buf := mb.cache.buf[li:] - hdr := buf[:msgHdrSize] - slen := int(le.Uint16(hdr[20:])) - if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { - seq := le.Uint64(hdr[4:]) - if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { + } + if ss.lastNeedsUpdate { + // Mark last as updated. + ss.lastNeedsUpdate = false + + lseq := ss.Last - 1 + if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq { + lseq = mbLseq + } + for slot := endSlot; slot >= startSlot; slot-- { + bi := mb.cache.idx[slot] &^ hbit + if bi == dbit { + // delete marker so skip. continue } - ss.First = seq - return + li := int(bi) - mb.cache.off + if li >= len(mb.cache.buf) { + // Can't overwrite ss.Last, just skip. + return + } + buf := mb.cache.buf[li:] + hdr := buf[:msgHdrSize] + slen := int(le.Uint16(hdr[20:])) + if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { + seq := le.Uint64(hdr[4:]) + if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) { + continue + } + // Sequence should never be lower, but guard against it nonetheless. + if seq < ss.First { + seq = ss.First + } + ss.Last = seq + if ss.Msgs == 1 { + ss.First = seq + ss.firstNeedsUpdate = false + } + return + } } } } diff --git a/server/filestore_test.go b/server/filestore_test.go index 1be968f8b11..458cef7a740 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5030,7 +5030,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { mb.clearCacheAndOffset() // Now call with start sequence of 1, the old one // This will panic without the fix. - mb.recalculateFirstForSubj("foo", 1, ss) + mb.recalculateForSubj("foo", ss) // Make sure it was update properly. require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false}) } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 88c06730b6a..de014e74b72 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -158,9 +158,8 @@ const ( // JSApiConsumerInfo is for obtaining general information about a consumer. // Will return JSON response. - JSApiConsumerInfoPre = "$JS.API.CONSUMER.INFO." - JSApiConsumerInfo = "$JS.API.CONSUMER.INFO.*.*" - JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s" + JSApiConsumerInfo = "$JS.API.CONSUMER.INFO.*.*" + JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s" // JSApiConsumerDelete is the endpoint to delete consumers. // Will return JSON response. @@ -973,15 +972,6 @@ func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response) } -// Use the account acc to send actual result from non-system account. -func (s *Server) sendAPIErrResponseFromAccount(ci *ClientInfo, acc *Account, subject, reply, request, response string) { - acc.trackAPIErr() - if reply != _EMPTY_ { - s.sendInternalAccountMsg(acc, reply, response) - } - s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response) -} - const errRespDelay = 500 * time.Millisecond func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup) { @@ -4243,55 +4233,6 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } -// This will be a quick check on point of entry for a consumer that does -// not exist. If that is the case we will return the response and return -// true which will shortcut the service import to alleviate pressure on -// the JS API queues. -func (s *Server) jsConsumerProcessMissing(c *client, acc *Account) bool { - subject := bytesToString(c.pa.subject) - streamName, consumerName := streamNameFromSubject(subject), consumerNameFromSubject(subject) - - // Check to make sure the consumer is assigned. - // All JS servers will have the meta information. - js, cc := s.getJetStreamCluster() - if js == nil || cc == nil { - return false - } - js.mu.RLock() - sa, ca := js.assignments(acc.Name, streamName, consumerName) - js.mu.RUnlock() - - // If we have a consumer assignment return false here and let normally processing takeover. - if ca != nil { - return false - } - - // We can't find the consumer, so mimic what would be the errors below. - var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}} - - // Need to make subject and reply real here for queued response processing. - subject = string(c.pa.subject) - reply := string(c.pa.reply) - - ci := c.getClientInfo(true) - - if hasJS, doErr := acc.checkJetStream(); !hasJS { - if doErr { - resp.Error = NewJSNotEnabledForAccountError() - s.sendAPIErrResponseFromAccount(ci, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) - } - } else if sa == nil { - resp.Error = NewJSStreamNotFoundError() - s.sendAPIErrResponseFromAccount(ci, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) - } else { - // If we are here the consumer is not present. - resp.Error = NewJSConsumerNotFoundError() - s.sendAPIErrResponseFromAccount(ci, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) - } - - return true -} - // Request for information about an consumer. func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ebcd29c8abb..8f08b1e502a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4740,15 +4740,6 @@ func (js *jetStream) consumerAssignment(account, stream, consumer string) *consu return nil } -// Return both the stream and consumer assignments. -// Lock should be held. -func (js *jetStream) assignments(account, stream, consumer string) (*streamAssignment, *consumerAssignment) { - if sa := js.streamAssignment(account, stream); sa != nil { - return sa, sa.consumers[consumer] - } - return nil, nil -} - // consumerAssigned informs us if this server has this consumer assigned. func (jsa *jsAccount) consumerAssigned(stream, consumer string) bool { jsa.mu.RLock() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 79dcc61be8d..2ac041b283f 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -3412,6 +3412,7 @@ func TestJetStreamClusterExtendedAccountInfo(t *testing.T) { // Go client will lag so use direct for now. getAccountInfo := func() *nats.AccountInfo { t.Helper() + info, err := js.AccountInfo() if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -3436,13 +3437,10 @@ func TestJetStreamClusterExtendedAccountInfo(t *testing.T) { js.ConsumerInfo("TEST-2", "NO-CONSUMER") js.ConsumerInfo("TEST-3", "NO-CONSUMER") - checkFor(t, 2*time.Second, 250*time.Millisecond, func() error { - ai = getAccountInfo() - if ai.API.Errors != 4 { - return fmt.Errorf("Expected 4 API calls to be errors, got %d", ai.API.Errors) - } - return nil - }) + ai = getAccountInfo() + if ai.API.Errors != 4 { + t.Fatalf("Expected 4 API calls to be errors, got %d", ai.API.Errors) + } } func TestJetStreamClusterPeerRemovalAPI(t *testing.T) { @@ -4321,8 +4319,7 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { if err := js.DeleteConsumer("NO-Q", "dlc"); !notAvailableErr(err) { t.Fatalf("Expected an 'unavailable' error, got %v", err) } - // Since we did not create the consumer our bypass will respond from the local server. - if _, err := js.ConsumerInfo("NO-Q", "dlc"); err != nats.ErrConsumerNotFound { + if _, err := js.ConsumerInfo("NO-Q", "dlc"); !notAvailableErr(err) { t.Fatalf("Expected an 'unavailable' error, got %v", err) } // Listers @@ -6812,6 +6809,48 @@ func TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes(t *testing.T) { } } +func TestJetStreamClusterConsumerInfoAfterCreate(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nl := c.randomNonLeader() + nc, js := jsClientConnect(t, nl) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // We pause applies for the server we're connected to. + // This is fine for the RAFT log and allowing the consumer to be created, + // but we will not be able to apply the consumer assignment for some time. + mjs := nl.getJetStream() + require_NotNil(t, js) + mg := mjs.getMetaGroup() + require_NotNil(t, mg) + err = mg.(*raft).PauseApply() + require_NoError(t, err) + + // Add consumer. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"}) + require_NoError(t, err) + + // Consumer info should not fail, this server should not short-circuit because + // it was not able to apply the consumer assignment. + _, err = js.ConsumerInfo("TEST", "CONSUMER") + require_NoError(t, err) + + // Resume applies. + mg.(*raft).ResumeApply() + + // Check consumer info still works. + _, err = js.ConsumerInfo("TEST", "CONSUMER") + require_NoError(t, err) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index a5b32c0e7b8..25d1ebea426 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3573,7 +3573,7 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { sub, err := js.PullSubscribe("foo", "C") require_NoError(t, err) - // Publish as many messages as the ack floor check threshold +5. + // Publish as many messages as the ack floor check threshold +5 (what we set ackfloor to later). totalMessages := 55 for i := 0; i < totalMessages; i++ { sendStreamMsg(t, nc, "foo", "HELLO") @@ -3583,19 +3583,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { _, err = sub.Fetch(10) require_NoError(t, err) - // We will grab the state with delivered being 10 and ackfloor being 0 directly. - cl := c.consumerLeader(globalAccountName, "TEST", "C") - require_NotNil(t, cl) - - mset, err := cl.GlobalAccount().lookupStream("TEST") - require_NoError(t, err) - o := mset.lookupConsumer("C") - require_NotNil(t, o) - o.mu.RLock() - state, err := o.store.State() - o.mu.RUnlock() - require_NoError(t, err) - require_NotNil(t, state) + // We will initialize the state with delivered being 10 and ackfloor being 0 directly. + // Fetch will asynchronously propagate this state, so can't reliably request this from the leader immediately. + state := &ConsumerState{Delivered: SequencePair{Consumer: 10, Stream: 10}} // Now let messages expire. checkFor(t, 5*time.Second, time.Second, func() error { @@ -3632,17 +3622,35 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { require_NoError(t, o.raftNode().InstallSnapshot(snap)) } - cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C") + cl := c.consumerLeader(globalAccountName, "TEST", "C") + require_NotNil(t, cl) + err = cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C") + require_NoError(t, err) c.waitOnConsumerLeader(globalAccountName, "TEST", "C") checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { ci, err := js.ConsumerInfo("TEST", "C") + if err != nil { + return err + } + // Replicated state should stay the same. + if ci.AckFloor.Stream != 5 && ci.AckFloor.Consumer != 5 { + return fmt.Errorf("replicated AckFloor not correct, expected %d, got %+v", 5, ci.AckFloor) + } + + cl = c.consumerLeader(globalAccountName, "TEST", "C") + mset, err := cl.GlobalAccount().lookupStream("TEST") require_NoError(t, err) + o := mset.lookupConsumer("C") + require_NotNil(t, o) + o.mu.RLock() + defer o.mu.RUnlock() + // Make sure we catch this and adjust. - if ci.AckFloor.Stream == uint64(totalMessages) && ci.AckFloor.Consumer == 10 { - return nil + if o.asflr != uint64(totalMessages) && o.adflr != 10 { + return fmt.Errorf("leader AckFloor not correct, expected %d, got %+v", 10, ci.AckFloor) } - return fmt.Errorf("AckFloor not correct, expected %d, got %+v", totalMessages, ci.AckFloor) + return nil }) } @@ -5456,12 +5464,18 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { // Want to compare sans cluster details which we know will change due to leader change. // Also last activity for delivered can be slightly off so nil out as well. - checkConsumerInfo := func(a, b *nats.ConsumerInfo) { + checkConsumerInfo := func(a, b *nats.ConsumerInfo, replicated bool) { t.Helper() require_Equal(t, a.Delivered.Consumer, 10) require_Equal(t, a.Delivered.Stream, 10) - require_Equal(t, a.AckFloor.Consumer, 10) - require_Equal(t, a.AckFloor.Stream, 10) + // If replicated, agreed upon state is used. Otherwise, o.asflr and o.adflr would be skipped ahead for R1. + if replicated { + require_Equal(t, a.AckFloor.Consumer, 0) + require_Equal(t, a.AckFloor.Stream, 0) + } else { + require_Equal(t, a.AckFloor.Consumer, 10) + require_Equal(t, a.AckFloor.Stream, 10) + } require_Equal(t, a.NumPending, 40) require_Equal(t, a.NumRedelivered, 0) a.Cluster, b.Cluster = nil, nil @@ -5471,7 +5485,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { } } - checkConsumerInfo(cia, cib) + checkConsumerInfo(cia, cib, true) // Memory based. sub, err = js.PullSubscribe("foo", "mem", @@ -5501,7 +5515,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { cib, err = js.ConsumerInfo("TEST", "mem") require_NoError(t, err) - checkConsumerInfo(cia, cib) + checkConsumerInfo(cia, cib, true) // Now file based but R1 and server restart. sub, err = js.PullSubscribe("foo", "r1", @@ -5535,7 +5549,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { // Created can skew a small bit due to server restart, this is expected. now := time.Now() cia.Created, cib.Created = now, now - checkConsumerInfo(cia, cib) + checkConsumerInfo(cia, cib, false) } func TestJetStreamClusterConsumerDefaultsFromStream(t *testing.T) { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 80fd87a6202..2e47cc857bc 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -1664,14 +1664,20 @@ func TestJetStreamSuperClusterConsumerDeliverNewBug(t *testing.T) { } c.waitOnConsumerLeader("$G", "T", "d") - ci, err = js.ConsumerInfo("T", "d") + + cl := c.consumerLeader(globalAccountName, "T", "d") + mset, err := cl.GlobalAccount().lookupStream("T") require_NoError(t, err) + o := mset.lookupConsumer("d") + require_NotNil(t, o) + o.mu.RLock() + defer o.mu.RUnlock() - if ci.Delivered.Consumer != 0 || ci.Delivered.Stream != 100 { - t.Fatalf("Incorrect consumer delivered info: %+v", ci.Delivered) + if o.dseq-1 != 0 || o.sseq-1 != 100 { + t.Fatalf("Incorrect consumer delivered info: dseq=%d, sseq=%d", o.dseq-1, o.sseq-1) } - if ci.NumPending != 0 { - t.Fatalf("Did not expect NumPending, got %d", ci.NumPending) + if np := o.checkNumPending(); np != 0 { + t.Fatalf("Did not expect NumPending, got %d", np) } } diff --git a/server/memstore.go b/server/memstore.go index cdf84a74c80..350cfa388e9 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -84,10 +84,13 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { ms.ageChk = nil } // Make sure to update MaxMsgsPer + if cfg.MaxMsgsPer < -1 { + cfg.MaxMsgsPer = -1 + } maxp := ms.maxp ms.maxp = cfg.MaxMsgsPer - // If the value is smaller we need to enforce that. - if ms.maxp != 0 && ms.maxp < maxp { + // If the value is smaller, or was unset before, we need to enforce that. + if ms.maxp > 0 && (maxp == 0 || ms.maxp < maxp) { lm := uint64(ms.maxp) ms.fss.Iter(func(subj []byte, ss *SimpleState) bool { if ss.Msgs > lm { @@ -140,8 +143,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int return ErrMaxBytes } // If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room. - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subj, ss) } sm, ok := ms.msgs[ss.First] if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) { @@ -427,8 +430,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje var totalSkipped uint64 // We will track start and end sequences as we go. ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) { - if fss.firstNeedsUpdate { - ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) + if fss.firstNeedsUpdate || fss.lastNeedsUpdate { + ms.recalculateForSubj(bytesToString(subj), fss) } if sseq <= fss.First { update(fss) @@ -582,8 +585,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState { fss := make(map[string]SimpleState) ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) { subjs := string(subj) - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subjs, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subjs, ss) } oss := fss[subjs] if oss.First == 0 { // New @@ -672,8 +675,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo var totalSkipped uint64 // We will track start and end sequences as we go. IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) { - if fss.firstNeedsUpdate { - ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) + if fss.firstNeedsUpdate || fss.lastNeedsUpdate { + ms.recalculateForSubj(bytesToString(subj), fss) } if sseq <= fss.First { update(fss) @@ -790,8 +793,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) { return } for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs { - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subj, ss) } if !ms.removeMsg(ss.First, false) { break @@ -1006,8 +1009,9 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { if sm := ms.msgs[seq]; sm != nil { bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) purged++ - delete(ms.msgs, seq) ms.removeSeqPerSubject(sm.subj, seq) + // Must delete message after updating per-subject info, to be consistent with file store. + delete(ms.msgs, seq) } } if purged > ms.state.Msgs { @@ -1095,8 +1099,9 @@ func (ms *memStore) Truncate(seq uint64) error { if sm := ms.msgs[i]; sm != nil { purged++ bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) - delete(ms.msgs, i) ms.removeSeqPerSubject(sm.subj, i) + // Must delete message after updating per-subject info, to be consistent with file store. + delete(ms.msgs, i) } } // Reset last. @@ -1262,8 +1267,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store if !ok { continue } - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subj, ss) } if ss.First < fseq { fseq = ss.First @@ -1357,31 +1362,47 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { } ss.Msgs-- - // If we know we only have 1 msg left don't need to search for next first. - if ss.Msgs == 1 { - if seq == ss.Last { - ss.Last = ss.First - } else { - ss.First = ss.Last - } - ss.firstNeedsUpdate = false - } else { - ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate - } + // We can lazily calculate the first/last sequence when needed. + ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate } -// Will recalculate the first sequence for this subject in this block. +// Will recalculate the first and/or last sequence for this subject. // Lock should be held. -func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) { - tseq := startSeq + 1 - if tseq < ms.state.FirstSeq { - tseq = ms.state.FirstSeq - } - for ; tseq <= ss.Last; tseq++ { - if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { - ss.First = tseq - ss.firstNeedsUpdate = false - return +func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) { + if ss.firstNeedsUpdate { + tseq := ss.First + 1 + if tseq < ms.state.FirstSeq { + tseq = ms.state.FirstSeq + } + for ; tseq <= ss.Last; tseq++ { + if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { + ss.First = tseq + ss.firstNeedsUpdate = false + if ss.Msgs == 1 { + ss.Last = tseq + ss.lastNeedsUpdate = false + return + } + break + } + } + } + if ss.lastNeedsUpdate { + tseq := ss.Last - 1 + if tseq > ms.state.LastSeq { + tseq = ms.state.LastSeq + } + for ; tseq >= ss.First; tseq-- { + if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { + ss.Last = tseq + ss.lastNeedsUpdate = false + if ss.Msgs == 1 { + ss.First = tseq + ss.firstNeedsUpdate = false + } + return + } } } } @@ -1397,7 +1418,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg) - delete(ms.msgs, seq) if ms.state.Msgs > 0 { ms.state.Msgs-- if ss > ms.state.Bytes { @@ -1422,6 +1442,8 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { // Remove any per subject tracking. ms.removeSeqPerSubject(sm.subj, seq) + // Must delete message after updating per-subject info, to be consistent with file store. + delete(ms.msgs, seq) if ms.scb != nil { // We do not want to hold any locks here. diff --git a/server/monitor.go b/server/monitor.go index 2bd25f9a7be..77a6c1fe71a 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3228,10 +3228,11 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) { Details: includeDetails, }) - code := http.StatusOK + code := hs.StatusCode if hs.Error != _EMPTY_ { s.Warnf("Healthcheck failed: %q", hs.Error) - code = hs.StatusCode + } else if len(hs.Errors) != 0 { + s.Warnf("Healthcheck failed: %d errors", len(hs.Errors)) } // Remove StatusCode from JSON representation when responding via HTTP // since this is already in the response. diff --git a/server/norace_test.go b/server/norace_test.go index 3a67cd9b802..5d54dfb1df1 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -11257,60 +11257,3 @@ func TestNoRaceJetStreamClusterLargeMetaSnapshotTiming(t *testing.T) { require_NoError(t, n.InstallSnapshot(snap)) t.Logf("Took %v to snap meta with size of %v\n", time.Since(start), friendlyBytes(len(snap))) } - -func TestNoRaceJetStreamClusterInfoOnMissingConsumers(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3F", 3) - defer c.shutdown() - - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() - - // Create a stream just so the consumer info processing misses on the consumer only. - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) - - done := make(chan bool) - pending := make(chan int, 1) - - // Check to make sure we never have any pending on the API queue. - go func() { - ml := c.leader() - for { - select { - case <-done: - return - case <-time.After(100 * time.Millisecond): - qlen := ml.jsAPIRoutedReqs.len() + int(ml.jsAPIRoutedReqs.inProgress()) - if qlen > 0 { - pending <- qlen - return - } - } - } - }() - - wg := sync.WaitGroup{} - wg.Add(500) - for i := 0; i < 500; i++ { - go func() { - defer wg.Done() - s := c.randomServer() - nc, js := jsClientConnect(t, s) - defer nc.Close() - // Check for non-existent consumers. - for c := 0; c < 1000; c++ { - _, err := js.ConsumerInfo("TEST", fmt.Sprintf("C-%d", c)) - require_Error(t, err) - } - }() - } - wg.Wait() - close(done) - if len(pending) > 0 { - t.Fatalf("Saw API pending of %d, expected always 0", <-pending) - } -} diff --git a/server/raft.go b/server/raft.go index 563af0d11dc..a6dff47e9ff 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1311,11 +1311,6 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { return true } - // Check here on catchup status. - if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex { - n.cancelCatchup() - } - // Check to see that we have heard from the current leader lately. if n.leader != noLeader && n.leader != n.id && n.catchup == nil { okInterval := int64(hbInterval) * 2 @@ -1326,7 +1321,9 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { } } if cs := n.catchup; cs != nil { + // We're actively catching up, can't mark current even if commit==applied. n.debug("Not current, still catching up pindex=%d, cindex=%d", n.pindex, cs.cindex) + return false } if n.commit == n.applied { diff --git a/server/raft_test.go b/server/raft_test.go index 29d50e57db9..0f30dec288e 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1699,3 +1699,144 @@ func TestNRGMemoryWALEmptiesSnapshotsDir(t *testing.T) { require_NoError(t, err) require_Len(t, len(files), 0) } + +func TestNRGHealthCheckWaitForCatchup(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) + + // Switch follower into catchup. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + require_Equal(t, n.catchup.cterm, aeHeartbeat.term) + require_Equal(t, n.catchup.cindex, aeHeartbeat.pindex) + + // Catchup first message. + n.processAppendEntry(aeMsg1, n.catchup.sub) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // Catchup second message. + n.processAppendEntry(aeMsg2, n.catchup.sub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.commit, 1) + require_False(t, n.Healthy()) + + // If we apply the entry sooner than we receive the next catchup message, + // should not mark as healthy since we're still in catchup. + n.Applied(1) + require_False(t, n.Healthy()) + + // Catchup third message. + n.processAppendEntry(aeMsg3, n.catchup.sub) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 2) + n.Applied(2) + require_False(t, n.Healthy()) + + // Heartbeat stops catchup. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_True(t, n.catchup == nil) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 3) + require_False(t, n.Healthy()) + + // Still need to wait for the last entry to be applied. + n.Applied(3) + require_True(t, n.Healthy()) +} + +func TestNRGHealthCheckWaitForDoubleCatchup(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) + + // Switch follower into catchup. + n.processAppendEntry(aeHeartbeat1, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + require_Equal(t, n.catchup.cterm, aeHeartbeat1.term) + require_Equal(t, n.catchup.cindex, aeHeartbeat1.pindex) + + // Catchup first message. + n.processAppendEntry(aeMsg1, n.catchup.sub) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // We miss this message, since we're catching up. + n.processAppendEntry(aeMsg3, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // We also miss the heartbeat, since we're catching up. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // Catchup second message, this will stop catchup. + n.processAppendEntry(aeMsg2, n.catchup.sub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.commit, 1) + n.Applied(1) + require_False(t, n.Healthy()) + + // We expect to still be in catchup, waiting for a heartbeat or new append entry to reset. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.cterm, aeHeartbeat1.term) + require_Equal(t, n.catchup.cindex, aeHeartbeat1.pindex) + + // We now get a 'future' heartbeat, should restart catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 2) // n.pindex + require_Equal(t, n.catchup.cterm, aeHeartbeat2.term) + require_Equal(t, n.catchup.cindex, aeHeartbeat2.pindex) + require_False(t, n.Healthy()) + + // Catchup third message. + n.processAppendEntry(aeMsg3, n.catchup.sub) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 2) + n.Applied(2) + require_False(t, n.Healthy()) + + // Heartbeat stops catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_True(t, n.catchup == nil) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 3) + require_False(t, n.Healthy()) + + // Still need to wait for the last entry to be applied. + n.Applied(3) + require_True(t, n.Healthy()) +} diff --git a/server/store.go b/server/store.go index 72e039816e9..1c8f7f7ec1f 100644 --- a/server/store.go +++ b/server/store.go @@ -166,6 +166,8 @@ type SimpleState struct { // Internal usage for when the first needs to be updated before use. firstNeedsUpdate bool + // Internal usage for when the last needs to be updated before use. + lastNeedsUpdate bool } // LostStreamData indicates msgs that have been lost. diff --git a/server/store_test.go b/server/store_test.go index f7832974b5b..a916ceedb89 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -141,3 +141,164 @@ func TestStoreDeleteRange(t *testing.T) { require_Equal(t, last, 2) require_Equal(t, num, 1) } + +func TestStoreSubjectStateConsistency(t *testing.T) { + testAllStoreAllPermutations( + t, false, + StreamConfig{Name: "TEST", Subjects: []string{"foo"}}, + func(t *testing.T, fs StreamStore) { + getSubjectState := func() SimpleState { + t.Helper() + ss := fs.SubjectsState("foo") + return ss["foo"] + } + var smp StoreMsg + expectFirstSeq := func(eseq uint64) { + t.Helper() + sm, _, err := fs.LoadNextMsg("foo", false, 0, &smp) + require_NoError(t, err) + require_Equal(t, sm.seq, eseq) + } + expectLastSeq := func(eseq uint64) { + t.Helper() + sm, err := fs.LoadLastMsg("foo", &smp) + require_NoError(t, err) + require_Equal(t, sm.seq, eseq) + } + + // Publish an initial batch of messages. + for i := 0; i < 4; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + // Expect 4 msgs, with first=1, last=4. + ss := getSubjectState() + require_Equal(t, ss.Msgs, 4) + require_Equal(t, ss.First, 1) + expectFirstSeq(1) + require_Equal(t, ss.Last, 4) + expectLastSeq(4) + + // Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate. + removed, err := fs.RemoveMsg(1) + require_NoError(t, err) + require_True(t, removed) + + // Will update first, so corrects to seq 2. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 3) + require_Equal(t, ss.First, 2) + expectFirstSeq(2) + require_Equal(t, ss.Last, 4) + expectLastSeq(4) + + // Remove last message, ss.Last is lazy so will only mark ss.lastNeedsUpdate. + removed, err = fs.RemoveMsg(4) + require_NoError(t, err) + require_True(t, removed) + + // Will update last, so corrects to 3. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 2) + require_Equal(t, ss.First, 2) + expectFirstSeq(2) + require_Equal(t, ss.Last, 3) + expectLastSeq(3) + + // Remove first message again. + removed, err = fs.RemoveMsg(2) + require_NoError(t, err) + require_True(t, removed) + + // Since we only have one message left, must update ss.First and ensure ss.Last equals. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.First, 3) + expectFirstSeq(3) + require_Equal(t, ss.Last, 3) + expectLastSeq(3) + + // Publish some more messages so we can test another scenario. + for i := 0; i < 3; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + // Just check the state is complete again. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 4) + require_Equal(t, ss.First, 3) + expectFirstSeq(3) + require_Equal(t, ss.Last, 7) + expectLastSeq(7) + + // Remove last sequence, ss.Last is lazy so doesn't get updated. + removed, err = fs.RemoveMsg(7) + require_NoError(t, err) + require_True(t, removed) + + // Remove first sequence, ss.First is lazy so doesn't get updated. + removed, err = fs.RemoveMsg(3) + require_NoError(t, err) + require_True(t, removed) + + // Remove (now) first sequence. Both ss.First and ss.Last are lazy and both need to be recalculated later. + removed, err = fs.RemoveMsg(5) + require_NoError(t, err) + require_True(t, removed) + + // ss.First and ss.Last should both be recalculated and equal each other. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.First, 6) + expectFirstSeq(6) + require_Equal(t, ss.Last, 6) + expectLastSeq(6) + }, + ) +} + +func TestStoreMaxMsgsPerUpdateBug(t *testing.T) { + config := func() StreamConfig { + return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0} + } + testAllStoreAllPermutations( + t, false, config(), + func(t *testing.T, fs StreamStore) { + for i := 0; i < 5; i++ { + _, _, err := fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + } + + ss := fs.State() + require_Equal(t, ss.Msgs, 5) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 5) + + // Update max messages per-subject from 0 (infinite) to 1. + // Since the per-subject limit was not specified before, messages should be removed upon config update. + cfg := config() + if _, ok := fs.(*fileStore); ok { + cfg.Storage = FileStorage + } else { + cfg.Storage = MemoryStorage + } + cfg.MaxMsgsPer = 1 + err := fs.UpdateConfig(&cfg) + require_NoError(t, err) + + // Only one message should remain. + ss = fs.State() + require_Equal(t, ss.Msgs, 1) + require_Equal(t, ss.FirstSeq, 5) + require_Equal(t, ss.LastSeq, 5) + + // Update max messages per-subject from 0 (infinite) to an invalid value (< -1). + cfg.MaxMsgsPer = -2 + err = fs.UpdateConfig(&cfg) + require_NoError(t, err) + require_Equal(t, cfg.MaxMsgsPer, -1) + }, + ) +}