diff --git a/server/errors.json b/server/errors.json index 9293361dba5..7f04b23469a 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1978,5 +1978,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSAtomicPublishInvalidBatchCommitErr", + "code": 400, + "error_code": 10200, + "description": "atomic publish batch commit is invalid", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/server/filestore.go b/server/filestore.go index 572e4e33dd7..b8e948b065a 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -179,7 +179,9 @@ type fileStore struct { scb StorageUpdateHandler rmcb StorageRemoveMsgHandler pmsgcb ProcessJetStreamMsgHandler - ageChk *time.Timer + ageChk *time.Timer // Timer to expire messages. + ageChkRun bool // Whether message expiration is currently running. + ageChkTime int64 // When the message expiration is scheduled to run. syncTmr *time.Timer cfg FileStreamInfo fcfg FileStoreConfig @@ -354,6 +356,9 @@ const ( // For smaller reuse buffers. Usually being generated during contention on the lead write buffer. // E.g. mirrors/sources etc. defaultSmallBlockSize = 1 * 1024 * 1024 // 1MB + // NOT an actual block size, but used for the sync.Pools, so that we don't allocate huge buffers + // unnecessarily until there are enough writes to justify it. + defaultTinyBlockSize = 1 * 1024 * 256 // 256KB // Maximum size for the encrypted head block. maximumEncryptedBlockSize = 2 * 1024 * 1024 // 2MB // Default for KV based @@ -692,6 +697,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { if fs.ageChk != nil && fs.cfg.MaxAge == 0 { fs.ageChk.Stop() fs.ageChk = nil + fs.ageChkTime = 0 } if fs.cfg.MaxMsgsPer > 0 && (old_cfg.MaxMsgsPer == 0 || fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer) { @@ -944,6 +950,12 @@ func (fs *fileStore) writeStreamMeta() error { } // Pools to recycle the blocks to help with memory pressure. +var blkPoolTiny = &sync.Pool{ + New: func() any { + b := [defaultTinyBlockSize]byte{} + return &b + }, +} var blkPoolSmall = &sync.Pool{ New: func() any { b := [defaultSmallBlockSize]byte{} @@ -966,6 +978,8 @@ var blkPoolBig = &sync.Pool{ // Get a new msg block based on sz estimate. func getMsgBlockBuf(sz int) (buf []byte) { switch { + case sz <= defaultTinyBlockSize: + return blkPoolTiny.Get().(*[defaultTinyBlockSize]byte)[:0] case sz <= defaultSmallBlockSize: return blkPoolSmall.Get().(*[defaultSmallBlockSize]byte)[:0] case sz <= defaultMediumBlockSize: @@ -983,6 +997,9 @@ func getMsgBlockBuf(sz int) (buf []byte) { // Recycle the msg block. func recycleMsgBlockBuf(buf []byte) { switch cap(buf) { + case defaultTinyBlockSize: + b := (*[defaultTinyBlockSize]byte)(buf[0:defaultTinyBlockSize]) + blkPoolTiny.Put(b) case defaultSmallBlockSize: b := (*[defaultSmallBlockSize]byte)(buf[0:defaultSmallBlockSize]) blkPoolSmall.Put(b) @@ -1509,6 +1526,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { // To detect gaps from compaction, and to ensure the sequence keeps moving up. var last uint64 + var hb [highwayhash.Size64]byte for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { if index+msgHdrSize > lbuf { @@ -1545,7 +1563,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { } else { hh.Write(data[slen : dlen-recordHashSize]) } - checksum := hh.Sum(nil) + checksum := hh.Sum(hb[:0]) if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) { truncate(index) return gatherLost(lbuf - index), tombstones, errBadMsg{mb.mfn, "invalid checksum"} @@ -1739,7 +1757,8 @@ func (fs *fileStore) recoverFullState() (rerr error) { buf = buf[:len(buf)-highwayhash.Size64] fs.hh.Reset() fs.hh.Write(buf) - if !bytes.Equal(h, fs.hh.Sum(nil)) { + var hb [highwayhash.Size64]byte + if !bytes.Equal(h, fs.hh.Sum(hb[:0])) { os.Remove(fn) fs.warn("Stream state checksum did not match") return errCorruptState @@ -2541,7 +2560,6 @@ func copyMsgBlocks(src []*msgBlock) []*msgBlock { // GetSeqFromTime looks for the first sequence number that has // the message with >= timestamp. -// FIXME(dlc) - inefficient, and dumb really. Make this better. func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { fs.mu.RLock() lastSeq := fs.state.LastSeq @@ -2561,14 +2579,17 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { lseq := atomic.LoadUint64(&mb.last.seq) var smv StoreMsg - - // Linear search, hence the dumb part.. ts := t.UnixNano() - for seq := fseq; seq <= lseq; seq++ { - sm, _, _ := mb.fetchMsgNoCopy(seq, &smv) - if sm != nil && sm.ts >= ts { - return sm.seq - } + + // Because sort.Search expects range [0,off), we have to manually + // calculate the offset from the first sequence. + off := int(lseq - fseq + 1) + i := sort.Search(off, func(i int) bool { + sm, _, _ := mb.fetchMsgNoCopy(fseq+uint64(i), &smv) + return sm != nil && sm.ts >= ts + }) + if i < off { + return fseq + uint64(i) } return 0 } @@ -4426,6 +4447,8 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, t if schedule, ok := getMessageSchedule(hdr); ok && !schedule.IsZero() { fs.scheduling.add(seq, subj, schedule.UnixNano()) fs.lmb.schedules++ + } else { + fs.scheduling.removeSubject(subj) } } @@ -4441,9 +4464,7 @@ func (fs *fileStore) StoreRawMsg(subj string, hdr, msg []byte, seq uint64, ts, t // sooner than initial replica expiry timer set to MaxAge when initializing. if !fs.receivedAny && fs.cfg.MaxAge != 0 && ts > 0 { fs.receivedAny = true - // don't block here by calling expireMsgs directly. - // Instead, set short timeout. - fs.resetAgeChk(int64(time.Millisecond * 50)) + fs.resetAgeChk(0) } fs.mu.Unlock() @@ -5030,6 +5051,12 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // If we are tracking multiple subjects here make sure we update that accounting. mb.removeSeqPerSubject(sm.subj, seq) fs.removePerSubject(sm.subj) + if fs.ttls != nil { + if ttl, err := getMessageTTL(sm.hdr); err == nil { + expires := time.Duration(sm.ts) + (time.Second * time.Duration(ttl)) + fs.ttls.Remove(seq, int64(expires)) + } + } fifo := seq == atomic.LoadUint64(&mb.first.seq) isLastBlock := mb == fs.lmb @@ -5463,7 +5490,8 @@ func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int, isLastBlock bool) error { mb.hh.Reset() mb.hh.Write(hdr[4:20]) mb.hh.Write(data) - checksum := mb.hh.Sum(nil) + var hb [highwayhash.Size64]byte + checksum := mb.hh.Sum(hb[:0]) // Write to msg record. b.Write(checksum) @@ -5823,6 +5851,12 @@ func (fs *fileStore) startAgeChk() { // Lock should be held. func (fs *fileStore) resetAgeChk(delta int64) { + // If we're already expiring messages, it will make sure to reset. + // Don't trigger again, as that could result in many expire goroutines. + if fs.ageChkRun { + return + } + var next int64 = math.MaxInt64 if fs.ttls != nil { next = fs.ttls.GetNextExpiration(next) @@ -5868,6 +5902,14 @@ func (fs *fileStore) resetAgeChk(delta int64) { fireIn = 250 * time.Millisecond } + // If we want to kick the timer to run later than what was assigned before, don't reset it. + // Otherwise, we could get in a situation where the timer is continuously reset, and it never runs. + expires := ats.AccessTime() + fireIn.Nanoseconds() + if fs.ageChkTime > 0 && expires > fs.ageChkTime { + return + } + + fs.ageChkTime = expires if fs.ageChk != nil { fs.ageChk.Reset(fireIn) } else { @@ -5880,6 +5922,7 @@ func (fs *fileStore) cancelAgeChk() { if fs.ageChk != nil { fs.ageChk.Stop() fs.ageChk = nil + fs.ageChkTime = 0 } } @@ -5890,18 +5933,22 @@ func (fs *fileStore) expireMsgs() { var smv StoreMsg var sm *StoreMsg - fs.mu.RLock() + fs.mu.Lock() maxAge := int64(fs.cfg.MaxAge) minAge := ats.AccessTime() - maxAge rmcb := fs.rmcb pmsgcb := fs.pmsgcb sdmTTL := int64(fs.cfg.SubjectDeleteMarkerTTL.Seconds()) sdmEnabled := sdmTTL > 0 - fs.mu.RUnlock() + // If SDM is enabled, but handlers aren't set up yet. Try again later. if sdmEnabled && (rmcb == nil || pmsgcb == nil) { + fs.resetAgeChk(0) + fs.mu.Unlock() return } + fs.ageChkRun = true + fs.mu.Unlock() if maxAge > 0 { var seq uint64 @@ -5917,7 +5964,7 @@ func (fs *fileStore) expireMsgs() { // if it was the last message of that particular subject that we just deleted. if sdmEnabled { if last, ok := fs.shouldProcessSdm(seq, sm.subj); ok { - sdm := last && isSubjectDeleteMarker(sm.hdr) + sdm := last && !isSubjectDeleteMarker(sm.hdr) fs.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL) } } else { @@ -5929,95 +5976,92 @@ func (fs *fileStore) expireMsgs() { minAge = ats.AccessTime() - maxAge } } + var ageDelta int64 + if sm != nil { + ageDelta = sm.ts - minAge + } fs.mu.Lock() defer fs.mu.Unlock() // TODO: Not great that we're holding the lock here, but the timed hash wheel isn't thread-safe. nextTTL := int64(math.MaxInt64) - var rmSeqs []uint64 - var ttlSdm map[string][]SDMBySubj + var rmSeqs []thw.HashWheelEntry if fs.ttls != nil { fs.ttls.ExpireTasks(func(seq uint64, ts int64) bool { - // Need to grab subject for the specified sequence if for SDM, and check - // if the message hasn't been removed in the meantime. - sm, _ = fs.msgForSeqLocked(seq, &smv, false) - if sm == nil { - return true - } - - if sdmEnabled { - if ttlSdm == nil { - ttlSdm = make(map[string][]SDMBySubj, 1) - } - ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)}) - } else { - // Collect sequences to remove. Don't remove messages inline here, - // as that releases the lock and THW is not thread-safe. - rmSeqs = append(rmSeqs, seq) - } - // Removing messages out of band, those can fail, and we can be shutdown halfway + rmSeqs = append(rmSeqs, thw.HashWheelEntry{Seq: seq, Expires: ts}) + // We might need to remove messages out of band, those can fail, and we can be shutdown halfway // through so don't remove from THW just yet. return false }) - if maxAge > 0 { - // Only check if we're expiring something in the next MaxAge interval, saves us a bit - // of work if MaxAge will beat us to the next expiry anyway. - nextTTL = fs.ttls.GetNextExpiration(time.Now().Add(time.Duration(maxAge)).UnixNano()) - } else { - nextTTL = fs.ttls.GetNextExpiration(math.MaxInt64) - } + nextTTL = fs.ttls.GetNextExpiration(math.MaxInt64) } // Remove messages collected by THW. - for _, seq := range rmSeqs { - fs.removeMsg(seq, false, false, false) - } - - // THW is unordered, so must sort by sequence and must not be holding the lock. - if len(ttlSdm) > 0 { + if !sdmEnabled { + for _, rm := range rmSeqs { + fs.removeMsg(rm.Seq, false, false, false) + } + } else { + // THW is unordered, so must sort by sequence and must not be holding the lock. fs.mu.Unlock() - for subj, es := range ttlSdm { - slices.SortFunc(es, func(a, b SDMBySubj) int { - if a.seq == b.seq { - return 0 - } else if a.seq < b.seq { - return -1 - } else { - return 1 - } - }) - for _, e := range es { - if last, ok := fs.shouldProcessSdm(e.seq, subj); ok { - sdm := last && !e.sdm - fs.handleRemovalOrSdm(e.seq, subj, sdm, sdmTTL) - } + slices.SortFunc(rmSeqs, func(a, b thw.HashWheelEntry) int { + if a.Seq == b.Seq { + return 0 + } else if a.Seq < b.Seq { + return -1 + } else { + return 1 + } + }) + for _, rm := range rmSeqs { + // Need to grab subject for the specified sequence if for SDM, and check + // if the message hasn't been removed in the meantime. + // We need to grab the message and check if we should process SDM while holding the lock, + // otherwise we can race if a deletion of this message is in progress. + fs.mu.Lock() + sm, _ = fs.msgForSeqLocked(rm.Seq, &smv, false) + if sm == nil { + fs.ttls.Remove(rm.Seq, rm.Expires) + fs.mu.Unlock() + continue + } + last, ok := fs.shouldProcessSdmLocked(rm.Seq, sm.subj) + fs.mu.Unlock() + if ok { + sdm := last && !isSubjectDeleteMarker(sm.hdr) + fs.handleRemovalOrSdm(rm.Seq, sm.subj, sdm, sdmTTL) } } fs.mu.Lock() } // Only cancel if no message left, not on potential lookup error that would result in sm == nil. + fs.ageChkRun, fs.ageChkTime = false, 0 if fs.state.Msgs == 0 && nextTTL == math.MaxInt64 { fs.cancelAgeChk() } else { - if sm == nil { - fs.resetAgeChk(0) - } else { - fs.resetAgeChk(sm.ts - minAge) - } + fs.resetAgeChk(ageDelta) } } func (fs *fileStore) shouldProcessSdm(seq uint64, subj string) (bool, bool) { fs.mu.Lock() defer fs.mu.Unlock() + return fs.shouldProcessSdmLocked(seq, subj) +} +// Lock should be held. +func (fs *fileStore) shouldProcessSdmLocked(seq uint64, subj string) (bool, bool) { if fs.sdm == nil { fs.sdm = newSDMMeta() } if p, ok := fs.sdm.pending[seq]; ok { + // Don't allow more proposals for the same sequence if we already did recently. + if time.Since(time.Unix(0, p.ts)) < 2*time.Second { + return p.last, false + } // If we're about to use the cached value, and we knew it was last before, // quickly check that we don't have more remaining messages for the subject now. // Which means we are not the last anymore and must reset to not remove later data. @@ -6028,11 +6072,6 @@ func (fs *fileStore) shouldProcessSdm(seq uint64, subj string) (bool, bool) { p.last = false } } - - // Don't allow more proposals for the same sequence if we already did recently. - if time.Since(time.Unix(0, p.ts)) < 2*time.Second { - return p.last, false - } fs.sdm.pending[seq] = SDMBySeq{p.last, time.Now().UnixNano()} return p.last, true } @@ -6072,9 +6111,15 @@ func (fs *fileStore) runMsgScheduling() { fs.mu.Lock() defer fs.mu.Unlock() - if fs.scheduling == nil || fs.pmsgcb == nil { + // If scheduling is enabled, but handler isn't set up yet. Try again later. + if fs.scheduling == nil { return } + if fs.pmsgcb == nil { + fs.scheduling.resetTimer() + return + } + fs.scheduling.running = true scheduledMsgs := fs.scheduling.getScheduledMessages(func(seq uint64, smv *StoreMsg) *StoreMsg { sm, _ := fs.msgForSeqLocked(seq, smv, false) @@ -6088,9 +6133,8 @@ func (fs *fileStore) runMsgScheduling() { fs.mu.Lock() } - if fs.scheduling != nil { - fs.scheduling.resetTimer() - } + fs.scheduling.running, fs.scheduling.deadline = false, 0 + fs.scheduling.resetTimer() } // Lock should be held. @@ -6219,6 +6263,17 @@ func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg // reference. It will now stay strong until the flusher decides it is time to weaken. mb.ecache.Strengthen() + // Make sure we have enough space to write into. If we don't then we can pull a buffer + // from the next pool size up to save us from reallocating in append() below. + if nsz := len(mb.cache.buf) + int(rl); cap(mb.cache.buf) < nsz { + prev := mb.cache.buf + mb.cache.buf = getMsgBlockBuf(nsz) + if prev != nil { + mb.cache.buf = mb.cache.buf[:copy(mb.cache.buf[:nsz], prev)] + recycleMsgBlockBuf(prev) + } + } + // Indexing index := len(mb.cache.buf) @@ -6835,14 +6890,25 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock { fs.mu.RLock() defer fs.mu.RUnlock() - t := minTime.UnixNano() - for _, mb := range fs.blks { + // Binary search for first block where last.ts >= t. + i, _ := slices.BinarySearchFunc(fs.blks, minTime.UnixNano(), func(mb *msgBlock, target int64) int { mb.mu.RLock() - found := t <= mb.last.ts + last := mb.last.ts mb.mu.RUnlock() - if found { - return mb + switch { + case last < target: + return -1 + case last > target: + return 1 + default: + return 0 } + }) + + // BinarySearchFunc returns an insertion point if not found. + // Either way, i is the index of the first mb where mb.last.ts >= t. + if i < len(fs.blks) { + return fs.blks[i] } return nil } @@ -7669,7 +7735,8 @@ func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCop } else { hh.Write(data[slen : dlen-recordHashSize]) } - if !bytes.Equal(hh.Sum(nil), data[len(data)-8:]) { + var hb [highwayhash.Size64]byte + if !bytes.Equal(hh.Sum(hb[:0]), data[len(data)-8:]) { return nil, errBadMsg{mb.mfn, "invalid checksum"} } } diff --git a/server/filestore_test.go b/server/filestore_test.go index 5ae0881ad21..80b4ce5a0fb 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -10534,3 +10534,55 @@ func TestFileStoreCorruptedNonOrderedSequences(t *testing.T) { }) } } + +func BenchmarkFileStoreGetSeqFromTime(b *testing.B) { + fs, err := newFileStore( + FileStoreConfig{ + StoreDir: b.TempDir(), + BlockSize: 16, + }, + StreamConfig{ + Name: "foo", + Subjects: []string{"foo.>"}, + Storage: FileStorage, + }, + ) + require_NoError(b, err) + defer fs.Stop() + + for range 4096 { + _, _, err := fs.StoreMsg("foo.bar", nil, []byte{1, 2, 3, 4, 5}, 0) + require_NoError(b, err) + } + + fs.mu.RLock() + fs.blks[0].mu.RLock() + fs.lmb.mu.RLock() + start := time.Unix(0, fs.blks[0].first.ts) + middle := time.Unix(0, fs.blks[0].first.ts+(fs.lmb.last.ts-fs.blks[0].first.ts)/2) + end := time.Unix(0, fs.lmb.last.ts) + fs.blks[0].mu.RUnlock() + fs.lmb.mu.RUnlock() + fs.mu.RUnlock() + + b.Run("Start", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + fs.GetSeqFromTime(start) + } + }) + + b.Run("Middle", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + fs.GetSeqFromTime(middle) + } + }) + + b.Run("End", func(b *testing.B) { + b.ReportAllocs() + for range b.N { + fs.GetSeqFromTime(end) + } + }) +} diff --git a/server/gsl/gsl.go b/server/gsl/gsl.go index 9fa413d7b67..55f9bad98bd 100644 --- a/server/gsl/gsl.go +++ b/server/gsl/gsl.go @@ -408,6 +408,9 @@ func (n *node[T]) isEmpty() bool { // Return the number of nodes for the given level. func (l *level[T]) numNodes() int { + if l == nil { + return 0 + } num := len(l.nodes) if l.pwc != nil { num++ @@ -489,39 +492,49 @@ func intersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], r *level[T if len(nsubj) > 0 { nsubj = append(subj, '.') } - switch { - case r.fwc != nil: + if r.fwc != nil { // We've reached a full wildcard, do a FWC match on the stree at this point // and don't keep iterating downward. nsubj := append(nsubj, '>') st.Match(nsubj, cb) - case r.pwc != nil: + return + } + if r.pwc != nil { // We've found a partial wildcard. We'll keep iterating downwards, but first // check whether there's interest at this level (without triggering dupes) and // match if so. + var done bool nsubj := append(nsubj, '*') if len(r.pwc.subs) > 0 { st.Match(nsubj, cb) + done = true } - if r.pwc.next != nil && r.pwc.next.numNodes() > 0 { + if r.pwc.next.numNodes() > 0 { intersectStree(st, r.pwc.next, nsubj, cb) } - default: - // Normal node with subject literals, keep iterating. - for t, n := range r.nodes { - nsubj := append(nsubj, t...) - if len(n.subs) > 0 { - if subjectHasWildcard(bytesToString(nsubj)) { - st.Match(nsubj, cb) - } else { - if e, ok := st.Find(nsubj); ok { - cb(nsubj, e) - } + if done { + return + } + } + // Normal node with subject literals, keep iterating. + for t, n := range r.nodes { + if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 { + // A wildcard at the next level will already visit these descendents + // so skip so we don't callback the same subject more than once. + continue + } + nsubj := append(nsubj, t...) + if len(n.subs) > 0 { + if subjectHasWildcard(bytesToString(nsubj)) { + st.Match(nsubj, cb) + } else { + if e, ok := st.Find(nsubj); ok { + cb(nsubj, e) } } - if n.next != nil && n.next.numNodes() > 0 { - intersectStree(st, n.next, nsubj, cb) - } + } + if n.next.numNodes() > 0 { + intersectStree(st, n.next, nsubj, cb) } } } diff --git a/server/gsl/gsl_test.go b/server/gsl/gsl_test.go index 7ae5d2bb26c..f8b149ed2a8 100644 --- a/server/gsl/gsl_test.go +++ b/server/gsl/gsl_test.go @@ -397,6 +397,35 @@ func TestGenericSublistInterestBasedIntersection(t *testing.T) { require_NoDuplicates(t, got) }) + t.Run("PWCExtended", func(t *testing.T) { + got := map[string]int{} + sl := NewSublist[int]() + require_NoError(t, sl.Insert("stream.*.child", 11)) + require_NoError(t, sl.Insert("stream.A", 22)) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 2) + require_NoDuplicates(t, got) + }) + + t.Run("PWCExtendedAggressive", func(t *testing.T) { + got := map[string]int{} + sl := NewSublist[int]() + require_NoError(t, sl.Insert("stream.A.child", 11)) + require_NoError(t, sl.Insert("*.A.child", 22)) + require_NoError(t, sl.Insert("stream.*.child", 22)) + require_NoError(t, sl.Insert("stream.A.*", 22)) + require_NoError(t, sl.Insert("stream.*.*", 22)) + require_NoError(t, sl.Insert("*.A.*", 22)) + require_NoError(t, sl.Insert("*.*.child", 22)) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 1) + require_NoDuplicates(t, got) + }) + t.Run("FWCAll", func(t *testing.T) { got := map[string]int{} sl := NewSublist[int]() diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 7e21bc1c4c7..b085080cf27 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -3931,6 +3931,8 @@ func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError if apiErr != nil { return apiErr } + jsa.js.mu.RLock() + defer jsa.js.mu.RUnlock() jsa.mu.RLock() defer jsa.mu.RUnlock() if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, cfg) >= selectedLimits.MaxStreams { diff --git a/server/jetstream_batching_test.go b/server/jetstream_batching_test.go index 5bf0a2a21db..c00d3c5fdf6 100644 --- a/server/jetstream_batching_test.go +++ b/server/jetstream_batching_test.go @@ -20,6 +20,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "math/big" "strconv" "strings" @@ -2567,3 +2568,125 @@ func TestJetStreamAtomicBatchPublishPersistModeAsync(t *testing.T) { _, err := jsStreamCreate(t, nc, cfg) require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("async persist mode is not supported with atomic batch publish"))) } + +func TestJetStreamAtomicBatchPublishExpectedLastSubjectSequence(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + cfg := &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Storage: FileStorage, + Replicas: 1, + AllowAtomicPublish: true, + } + _, err := jsStreamCreate(t, nc, cfg) + require_NoError(t, err) + + _, err = js.Publish("foo.A", nil) + require_NoError(t, err) + _, err = js.Publish("foo.B", nil) + require_NoError(t, err) + + m := nats.NewMsg("foo.A") + m.Header.Set("Nats-Batch-Id", "uuid") + m.Header.Set("Nats-Batch-Sequence", "1") + m.Header.Set("Nats-Expected-Last-Sequence", "2") + m.Header.Set("Nats-Expected-Last-Subject-Sequence", "1") + m.Header.Set("Nats-Expected-Last-Subject-Sequence-Subject", "foo.A") + msg, err := nc.RequestMsg(m, time.Second) + require_NoError(t, err) + require_Len(t, len(msg.Data), 0) // Empty ack. + + m = nats.NewMsg("foo.B") + m.Header.Set("Nats-Batch-Id", "uuid") + m.Header.Set("Nats-Batch-Sequence", "2") + m.Header.Set("Nats-Batch-Commit", "1") + m.Header.Set("Nats-Expected-Last-Subject-Sequence", "2") + m.Header.Set("Nats-Expected-Last-Subject-Sequence-Subject", "foo.B") + msg, err = nc.RequestMsg(m, time.Second) + require_NoError(t, err) + var resp JSPubAckResponse + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_True(t, resp.Error == nil) + require_Equal(t, resp.PubAck.Sequence, 4) + require_Equal(t, resp.PubAck.BatchId, "uuid") + require_Equal(t, resp.PubAck.BatchSize, 2) +} + +func TestJetStreamAtomicBatchPublishCommitUnsupported(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + cfg := &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: MemoryStorage, + Replicas: 1, + AllowAtomicPublish: true, + } + _, err := jsStreamCreate(t, nc, cfg) + require_NoError(t, err) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + + var resp JSPubAckResponse + for _, unsupportedCommit := range []string{"", "unsupported", "0"} { + m := nats.NewMsg("foo") + m.Header.Set("Nats-Batch-Id", "uuid") + m.Header.Set("Nats-Batch-Sequence", "1") + _, err = nc.RequestMsg(m, time.Second) + require_NoError(t, err) + + m.Header.Set("Nats-Batch-Sequence", "2") + m.Header.Set("Nats-Batch-Commit", unsupportedCommit) + msg, err := nc.RequestMsg(m, time.Second) + require_NoError(t, err) + resp = JSPubAckResponse{} + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_True(t, resp.Error != nil) + require_Error(t, resp.Error, NewJSAtomicPublishInvalidBatchCommitError()) + + // Confirm no batches are left. + mset.mu.RLock() + batches := mset.batches + mset.mu.RUnlock() + batches.mu.Lock() + groups := len(batches.group) + batches.mu.Unlock() + require_Len(t, groups, 0) + } + + // The required API level should allow the batch to be rejected. + m := nats.NewMsg("foo") + m.Header.Set("Nats-Batch-Id", "uuid") + m.Header.Set("Nats-Batch-Sequence", "1") + m.Header.Set("Nats-Batch-Commit", "1") + m.Header.Set("Nats-Required-Api-Level", strconv.Itoa(math.MaxInt)) + msg, err := nc.RequestMsg(m, time.Second) + require_NoError(t, err) + resp = JSPubAckResponse{} + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_True(t, resp.Error != nil) + require_Error(t, resp.Error, NewJSRequiredApiLevelError()) + + // If required API level check passes, the header should be stripped. + m.Header.Set("Nats-Required-Api-Level", "0") + msg, err = nc.RequestMsg(m, time.Second) + require_NoError(t, err) + resp = JSPubAckResponse{} + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_True(t, resp.Error == nil) + require_Equal(t, resp.PubAck.Sequence, 1) + + sm, err := mset.getMsg(1) + require_NoError(t, err) + require_Len(t, len(sliceHeader(JSRequiredApiLevel, sm.Header)), 0) +} diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index cd75089ab89..bdd263da39d 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -18,7 +18,9 @@ package server import ( "encoding/json" "fmt" + "math" "math/rand" + "strconv" "sync" "sync/atomic" "testing" @@ -991,6 +993,90 @@ func BenchmarkJetStreamPublish(b *testing.B) { } } +func BenchmarkJetStreamMetaSnapshot(b *testing.B) { + c := createJetStreamClusterExplicit(b, "R3S", 3) + defer c.shutdown() + + setup := func(reqLevel string) *jetStream { + ml := c.leader() + acc, js := ml.globalAccount(), ml.getJetStream() + n := js.getMetaGroup() + + // Create all streams and consumers. + numStreams := 200 + numConsumers := 500 + ci := &ClientInfo{Cluster: "R3S", Account: globalAccountName} + js.mu.Lock() + metadata := map[string]string{JSRequiredLevelMetadataKey: reqLevel} + for i := 0; i < numStreams; i++ { + scfg := &StreamConfig{ + Name: fmt.Sprintf("STREAM-%d", i), + Subjects: []string{fmt.Sprintf("SUBJECT-%d", i)}, + Storage: MemoryStorage, + Metadata: metadata, + } + cfg, _ := ml.checkStreamCfg(scfg, acc, false) + rg, _ := js.createGroupForStream(ci, &cfg) + sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: &cfg, Client: ci, Created: time.Now().UTC()} + n.Propose(encodeAddStreamAssignment(sa)) + + for j := 0; j < numConsumers; j++ { + ccfg := &ConsumerConfig{ + Durable: fmt.Sprintf("CONSUMER-%d", j), + MemoryStorage: true, + Metadata: metadata, + } + selectedLimits, _, _, _ := acc.selectLimits(ccfg.replicas(&cfg)) + srvLim := &ml.getOpts().JetStreamLimits + setConsumerConfigDefaults(ccfg, &cfg, srvLim, selectedLimits, false) + rg = js.cluster.createGroupForConsumer(ccfg, sa) + ca := &consumerAssignment{Group: rg, Stream: cfg.Name, Name: ccfg.Durable, Config: ccfg, Client: ci, Created: time.Now().UTC()} + n.Propose(encodeAddConsumerAssignment(ca)) + } + } + js.mu.Unlock() + + // Wait for all servers to have created all assets. + checkFor(b, 20*time.Second, 200*time.Millisecond, func() error { + for _, s := range c.servers { + sjs := s.getJetStream() + sjs.mu.RLock() + streams := sjs.cluster.streams[globalAccountName] + if len(streams) != numStreams { + sjs.mu.RUnlock() + return fmt.Errorf("expected %d streams, got %d", numStreams, len(streams)) + } + for _, sa := range streams { + if nc := len(sa.consumers); nc != numConsumers { + sjs.mu.RUnlock() + return fmt.Errorf("expected %d consumers, got %d", numConsumers, nc) + } + } + sjs.mu.RUnlock() + } + return nil + }) + return js + } + + for _, t := range []struct { + title string + reqLevel string + }{ + {title: "Default", reqLevel: "0"}, + {title: "AllUnsupported", reqLevel: strconv.Itoa(math.MaxInt)}, + } { + b.Run(t.title, func(b *testing.B) { + js := setup(t.reqLevel) + b.ResetTimer() + for range b.N { + js.metaSnapshot() + } + b.StopTimer() + }) + } +} + func BenchmarkJetStreamCounters(b *testing.B) { const ( verbose = false diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c1a542acf58..725b842ab8d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -21,7 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "math" "math/rand" "os" @@ -33,6 +32,7 @@ import ( "sync/atomic" "time" + "github.com/antithesishq/antithesis-sdk-go/assert" "github.com/klauspost/compress/s2" "github.com/minio/highwayhash" "github.com/nats-io/nuid" @@ -136,14 +136,15 @@ type raftGroup struct { // streamAssignment is what the meta controller uses to assign streams to peers. type streamAssignment struct { - Client *ClientInfo `json:"client,omitempty"` - Created time.Time `json:"created"` - Config *StreamConfig `json:"stream"` - Group *raftGroup `json:"group"` - Sync string `json:"sync"` - Subject string `json:"subject,omitempty"` - Reply string `json:"reply,omitempty"` - Restore *StreamState `json:"restore_state,omitempty"` + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + ConfigJSON json.RawMessage `json:"stream"` + Config *StreamConfig `json:"-"` + Group *raftGroup `json:"group"` + Sync string `json:"sync"` + Subject string `json:"subject,omitempty"` + Reply string `json:"reply,omitempty"` + Restore *StreamState `json:"restore_state,omitempty"` // Internal consumers map[string]*consumerAssignment responded bool @@ -155,14 +156,13 @@ type streamAssignment struct { } type unsupportedStreamAssignment struct { - json []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. reason string info StreamInfo sysc *client infoSub *subscription } -func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, json []byte) *unsupportedStreamAssignment { +func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment) *unsupportedStreamAssignment { reason := "stopped" if sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata) { if req := getRequiredApiLevel(sa.Config.Metadata); req != _EMPTY_ { @@ -170,7 +170,6 @@ func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, json []byte } } return &unsupportedStreamAssignment{ - json: json, reason: reason, info: StreamInfo{ Created: sa.Created, @@ -215,15 +214,16 @@ func (usa *unsupportedStreamAssignment) closeInfoSub(s *Server) { // consumerAssignment is what the meta controller uses to assign consumers to streams. type consumerAssignment struct { - Client *ClientInfo `json:"client,omitempty"` - Created time.Time `json:"created"` - Name string `json:"name"` - Stream string `json:"stream"` - Config *ConsumerConfig `json:"consumer"` - Group *raftGroup `json:"group"` - Subject string `json:"subject,omitempty"` - Reply string `json:"reply,omitempty"` - State *ConsumerState `json:"state,omitempty"` + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + Name string `json:"name"` + Stream string `json:"stream"` + ConfigJSON json.RawMessage `json:"consumer"` + Config *ConsumerConfig `json:"-"` + Group *raftGroup `json:"group"` + Subject string `json:"subject,omitempty"` + Reply string `json:"reply,omitempty"` + State *ConsumerState `json:"state,omitempty"` // Internal responded bool recovering bool @@ -234,14 +234,13 @@ type consumerAssignment struct { } type unsupportedConsumerAssignment struct { - json []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. reason string info ConsumerInfo sysc *client infoSub *subscription } -func newUnsupportedConsumerAssignment(ca *consumerAssignment, json []byte) *unsupportedConsumerAssignment { +func newUnsupportedConsumerAssignment(ca *consumerAssignment) *unsupportedConsumerAssignment { reason := "stopped" if ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata) { if req := getRequiredApiLevel(ca.Config.Metadata); req != _EMPTY_ { @@ -249,7 +248,6 @@ func newUnsupportedConsumerAssignment(ca *consumerAssignment, json []byte) *unsu } } return &unsupportedConsumerAssignment{ - json: json, reason: reason, info: ConsumerInfo{ Stream: ca.Stream, @@ -294,35 +292,13 @@ func (uca *unsupportedConsumerAssignment) closeInfoSub(s *Server) { } type writeableConsumerAssignment struct { - consumerAssignment - // Internal - unsupportedJson []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. -} - -func (wca *writeableConsumerAssignment) MarshalJSON() ([]byte, error) { - if wca.unsupportedJson != nil { - return wca.unsupportedJson, nil - } - return json.Marshal(wca.consumerAssignment) -} - -func (wca *writeableConsumerAssignment) UnmarshalJSON(data []byte) error { - var unsupported bool - var ca consumerAssignment - decoder := json.NewDecoder(bytes.NewReader(data)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&ca); err != nil { - unsupported = true - ca = consumerAssignment{} - if err = json.Unmarshal(data, &ca); err != nil { - return err - } - } - wca.consumerAssignment = ca - if unsupported || (wca.Config != nil && !supportsRequiredApiLevel(wca.Config.Metadata)) { - wca.unsupportedJson = data - } - return nil + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + Name string `json:"name"` + Stream string `json:"stream"` + ConfigJSON json.RawMessage `json:"consumer"` + Group *raftGroup `json:"group"` + State *ConsumerState `json:"state,omitempty"` } // streamPurge is what the stream leader will replicate when purging a stream. @@ -1542,44 +1518,12 @@ func (js *jetStream) checkClusterSize() { // Represents our stable meta state that we can write out. type writeableStreamAssignment struct { - backingStreamAssignment - // Internal - unsupportedJson []byte // The raw JSON content of the assignment, if it's unsupported due to the required API level. -} - -type backingStreamAssignment struct { - Client *ClientInfo `json:"client,omitempty"` - Created time.Time `json:"created"` - Config *StreamConfig `json:"stream"` - Group *raftGroup `json:"group"` - Sync string `json:"sync"` - Consumers []*writeableConsumerAssignment -} - -func (wsa *writeableStreamAssignment) MarshalJSON() ([]byte, error) { - if wsa.unsupportedJson != nil { - return wsa.unsupportedJson, nil - } - return json.Marshal(wsa.backingStreamAssignment) -} - -func (wsa *writeableStreamAssignment) UnmarshalJSON(data []byte) error { - var unsupported bool - var bsa backingStreamAssignment - decoder := json.NewDecoder(bytes.NewReader(data)) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&bsa); err != nil { - unsupported = true - bsa = backingStreamAssignment{} - if err = json.Unmarshal(data, &bsa); err != nil { - return err - } - } - wsa.backingStreamAssignment = bsa - if unsupported || (wsa.Config != nil && !supportsRequiredApiLevel(wsa.Config.Metadata)) { - wsa.unsupportedJson = data - } - return nil + Client *ClientInfo `json:"client,omitempty"` + Created time.Time `json:"created"` + ConfigJSON json.RawMessage `json:"stream"` + Group *raftGroup `json:"group"` + Sync string `json:"sync"` + Consumers []*writeableConsumerAssignment } func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConfig, bool) { @@ -1604,19 +1548,13 @@ func (js *jetStream) metaSnapshot() ([]byte, error) { streams := make([]writeableStreamAssignment, 0, nsa) for _, asa := range cc.streams { for _, sa := range asa { - if sa.unsupported != nil && sa.unsupported.json != nil { - streams = append(streams, writeableStreamAssignment{unsupportedJson: sa.unsupported.json}) - continue - } wsa := writeableStreamAssignment{ - backingStreamAssignment: backingStreamAssignment{ - Client: sa.Client.forAssignmentSnap(), - Created: sa.Created, - Config: sa.Config, - Group: sa.Group, - Sync: sa.Sync, - Consumers: make([]*writeableConsumerAssignment, 0, len(sa.consumers)), - }, + Client: sa.Client.forAssignmentSnap(), + Created: sa.Created, + ConfigJSON: sa.ConfigJSON, + Group: sa.Group, + Sync: sa.Sync, + Consumers: make([]*writeableConsumerAssignment, 0, len(sa.consumers)), } for _, ca := range sa.consumers { // Skip if the consumer is pending, we can't include it in our snapshot. @@ -1624,16 +1562,16 @@ func (js *jetStream) metaSnapshot() ([]byte, error) { if ca.pending { continue } - if ca.unsupported != nil && ca.unsupported.json != nil { - wsa.Consumers = append(wsa.Consumers, &writeableConsumerAssignment{unsupportedJson: ca.unsupported.json}) - nca++ - continue + wca := writeableConsumerAssignment{ + Client: ca.Client.forAssignmentSnap(), + Created: ca.Created, + Name: ca.Name, + Stream: ca.Stream, + ConfigJSON: ca.ConfigJSON, + Group: ca.Group, + State: ca.State, } - cca := *ca - cca.Stream = wsa.Config.Name // Needed for safe roll-backs. - cca.Client = cca.Client.forAssignmentSnap() - cca.Subject, cca.Reply = _EMPTY_, _EMPTY_ - wsa.Consumers = append(wsa.Consumers, &writeableConsumerAssignment{consumerAssignment: cca}) + wsa.Consumers = append(wsa.Consumers, &wca) nca++ } streams = append(streams, wsa) @@ -1685,30 +1623,25 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove // Build our new version here outside of js. streams := make(map[string]map[string]*streamAssignment) for _, wsa := range wsas { - fixCfgMirrorWithDedupWindow(wsa.Config) as := streams[wsa.Client.serviceAccount()] if as == nil { as = make(map[string]*streamAssignment) streams[wsa.Client.serviceAccount()] = as } - sa := &streamAssignment{Client: wsa.Client, Created: wsa.Created, Config: wsa.Config, Group: wsa.Group, Sync: wsa.Sync} - if wsa.unsupportedJson != nil { - sa.unsupported = newUnsupportedStreamAssignment(js.srv, sa, wsa.unsupportedJson) - } + sa := &streamAssignment{Client: wsa.Client, Created: wsa.Created, ConfigJSON: wsa.ConfigJSON, Group: wsa.Group, Sync: wsa.Sync} + decodeStreamAssignmentConfig(js.srv, sa) if len(wsa.Consumers) > 0 { sa.consumers = make(map[string]*consumerAssignment) for _, wca := range wsa.Consumers { if wca.Stream == _EMPTY_ { wca.Stream = sa.Config.Name // Rehydrate from the stream name. } - ca := &consumerAssignment{Client: wca.Client, Created: wca.Created, Name: wca.Name, Stream: wca.Stream, Config: wca.Config, Group: wca.Group, Subject: wca.Subject, Reply: wca.Reply, State: wca.State} - if wca.unsupportedJson != nil { - ca.unsupported = newUnsupportedConsumerAssignment(ca, wca.unsupportedJson) - } + ca := &consumerAssignment{Client: wca.Client, Created: wca.Created, Name: wca.Name, Stream: wca.Stream, ConfigJSON: wca.ConfigJSON, Group: wca.Group, State: wca.State} + decodeConsumerAssignmentConfig(ca) sa.consumers[ca.Name] = ca } } - as[wsa.Config.Name] = sa + as[sa.Config.Name] = sa } js.mu.Lock() @@ -2710,7 +2643,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps batch := mset.batchApply mset.mu.RUnlock() if batch != nil { - mset.srv.Debugf("[batch] reject %s - empty entry", batch.id) batch.rejectBatchState(mset) } } @@ -2746,6 +2678,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps mset.retryMirrorConsumer() continue } + // If the error signals we timed out of a snapshot, we should try to replay the snapshot + // instead of fully resetting the state. Resetting the clustered state may result in + // race conditions and should only be used as a last effort attempt. + if errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries { + if node := mset.raftNode(); node != nil && node.DrainAndReplaySnapshot() { + break + } + } // We will attempt to reset our cluster state. if mset.resetClusteredState(err) { aq.recycle(&ces) @@ -3096,10 +3036,16 @@ func (mset *stream) isMigrating() bool { // resetClusteredState is called when a clustered stream had an error (e.g sequence mismatch, bad snapshot) and needs to be reset. func (mset *stream) resetClusteredState(err error) bool { mset.mu.RLock() - s, js, jsa, sa, acc, node := mset.srv, mset.js, mset.jsa, mset.sa, mset.acc, mset.node + s, js, jsa, sa, acc, node, name := mset.srv, mset.js, mset.jsa, mset.sa, mset.acc, mset.node, mset.nameLocked(false) stype, tierName, replicas := mset.cfg.Storage, mset.tier, mset.cfg.Replicas mset.mu.RUnlock() + assert.Unreachable("Reset clustered state", map[string]any{ + "stream": name, + "account": acc.Name, + "err": err, + }) + // The stream might already be deleted and not assigned to us anymore. // In any case, don't revive the stream if it's already closed. if mset.closed.Load() { @@ -3408,7 +3354,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } panic(err.Error()) } - s, cc := js.server(), js.cluster + s := js.server() var removed bool if md.NoErase { @@ -3417,9 +3363,18 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco removed, err = mset.eraseMsg(md.Seq) } - // Cluster reset error. + var isLeader bool + if node := mset.raftNode(); node != nil && node.Leader() { + isLeader = true + } + if err == ErrStoreEOF { - return 0, err + if isLeader && !isRecovering { + var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} + resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err)) + s.sendAPIErrResponse(md.Client, mset.account(), md.Subject, md.Reply, _EMPTY_, s.jsonResponse(resp)) + } + continue } if err != nil && !isRecovering { @@ -3427,10 +3382,6 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco md.Seq, md.Client.serviceAccount(), md.Stream, err) } - js.mu.RLock() - isLeader := cc.isStreamLeader(md.Client.serviceAccount(), md.Stream) - js.mu.RUnlock() - if isLeader && !isRecovering { var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} if err != nil { @@ -3601,7 +3552,7 @@ func (mset *stream) skipBatchIfRecovering(batch *batchApply, buf []byte) (bool, // We can skip if we know this is less than what we already have. if lseq-clfs < last { - mset.srv.Debugf("[batch] Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d", + mset.srv.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d", mset.accountLocked(false), mset.nameLocked(false), lseq+1-clfs, last) // Check for any preAcks in case we are interest based. mset.clearAllPreAcks(lseq + 1 - clfs) @@ -4874,7 +4825,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Mark stream as unsupported as well if sa.unsupported == nil { - sa.unsupported = newUnsupportedStreamAssignment(s, sa, nil) + sa.unsupported = newUnsupportedStreamAssignment(s, sa) } sa.unsupported.setupInfoSub(s, sa) js.mu.Unlock() @@ -8021,6 +7972,7 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset func encodeAddStreamAssignment(sa *streamAssignment) []byte { csa := *sa csa.Client = csa.Client.forProposal() + csa.ConfigJSON, _ = json.Marshal(sa.Config) var bb bytes.Buffer bb.WriteByte(byte(assignStreamOp)) json.NewEncoder(&bb).Encode(csa) @@ -8030,6 +7982,7 @@ func encodeAddStreamAssignment(sa *streamAssignment) []byte { func encodeUpdateStreamAssignment(sa *streamAssignment) []byte { csa := *sa csa.Client = csa.Client.forProposal() + csa.ConfigJSON, _ = json.Marshal(sa.Config) var bb bytes.Buffer bb.WriteByte(byte(updateStreamOp)) json.NewEncoder(&bb).Encode(csa) @@ -8039,6 +7992,7 @@ func encodeUpdateStreamAssignment(sa *streamAssignment) []byte { func encodeDeleteStreamAssignment(sa *streamAssignment) []byte { csa := *sa csa.Client = csa.Client.forProposal() + csa.ConfigJSON, _ = json.Marshal(sa.Config) var bb bytes.Buffer bb.WriteByte(byte(removeStreamOp)) json.NewEncoder(&bb).Encode(csa) @@ -8046,23 +8000,35 @@ func encodeDeleteStreamAssignment(sa *streamAssignment) []byte { } func decodeStreamAssignment(s *Server, buf []byte) (*streamAssignment, error) { - var unsupported bool var sa streamAssignment - decoder := json.NewDecoder(bytes.NewReader(buf)) + if err := json.Unmarshal(buf, &sa); err != nil { + return nil, err + } + if err := decodeStreamAssignmentConfig(s, &sa); err != nil { + return nil, err + } + return &sa, nil +} + +func decodeStreamAssignmentConfig(s *Server, sa *streamAssignment) error { + var unsupported bool + var cfg StreamConfig + decoder := json.NewDecoder(bytes.NewReader(sa.ConfigJSON)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&sa); err != nil { + if err := decoder.Decode(&cfg); err != nil { unsupported = true - sa = streamAssignment{} - if err = json.Unmarshal(buf, &sa); err != nil { - return nil, err + cfg = StreamConfig{} + if err = json.Unmarshal(sa.ConfigJSON, &cfg); err != nil { + return err } } + sa.Config = &cfg fixCfgMirrorWithDedupWindow(sa.Config) if unsupported || (sa.Config != nil && !supportsRequiredApiLevel(sa.Config.Metadata)) { - sa.unsupported = newUnsupportedStreamAssignment(s, &sa, copyBytes(buf)) + sa.unsupported = newUnsupportedStreamAssignment(s, sa) } - return &sa, nil + return nil } func encodeDeleteRange(dr *DeleteRange) []byte { @@ -8480,6 +8446,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { cca := *ca cca.Client = cca.Client.forProposal() + cca.ConfigJSON, _ = json.Marshal(ca.Config) var bb bytes.Buffer bb.WriteByte(byte(assignConsumerOp)) json.NewEncoder(&bb).Encode(cca) @@ -8489,6 +8456,7 @@ func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte { cca := *ca cca.Client = cca.Client.forProposal() + cca.ConfigJSON, _ = json.Marshal(ca.Config) var bb bytes.Buffer bb.WriteByte(byte(removeConsumerOp)) json.NewEncoder(&bb).Encode(cca) @@ -8496,27 +8464,39 @@ func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte { } func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) { - var unsupported bool var ca consumerAssignment - decoder := json.NewDecoder(bytes.NewReader(buf)) + if err := json.Unmarshal(buf, &ca); err != nil { + return nil, err + } + if err := decodeConsumerAssignmentConfig(&ca); err != nil { + return nil, err + } + return &ca, nil +} + +func decodeConsumerAssignmentConfig(ca *consumerAssignment) error { + var unsupported bool + var cfg ConsumerConfig + decoder := json.NewDecoder(bytes.NewReader(ca.ConfigJSON)) decoder.DisallowUnknownFields() - if err := decoder.Decode(&ca); err != nil { + if err := decoder.Decode(&cfg); err != nil { unsupported = true - ca = consumerAssignment{} - if err = json.Unmarshal(buf, &ca); err != nil { - return nil, err + cfg = ConsumerConfig{} + if err = json.Unmarshal(ca.ConfigJSON, &cfg); err != nil { + return err } } - + ca.Config = &cfg if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { - ca.unsupported = newUnsupportedConsumerAssignment(&ca, copyBytes(buf)) + ca.unsupported = newUnsupportedConsumerAssignment(ca) } - return &ca, nil + return nil } func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { cca := *ca cca.Client = cca.Client.forProposal() + cca.ConfigJSON, _ = json.Marshal(ca.Config) var bb bytes.Buffer bb.WriteByte(byte(assignCompressedConsumerOp)) s2e := s2.NewWriter(&bb) @@ -8526,32 +8506,16 @@ func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { } func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) { - var unsupported bool var ca consumerAssignment bb := bytes.NewBuffer(buf) s2d := s2.NewReader(bb) decoder := json.NewDecoder(s2d) - decoder.DisallowUnknownFields() if err := decoder.Decode(&ca); err != nil { - unsupported = true - ca = consumerAssignment{} - bb = bytes.NewBuffer(buf) - s2d = s2.NewReader(bb) - if err = json.NewDecoder(s2d).Decode(&ca); err != nil { - return nil, err - } + return nil, err } - - if unsupported || (ca.Config != nil && !supportsRequiredApiLevel(ca.Config.Metadata)) { - bb = bytes.NewBuffer(buf) - s2d = s2.NewReader(bb) - dec, err := io.ReadAll(s2d) - if err != nil { - return nil, err - } - ca.unsupported = newUnsupportedConsumerAssignment(&ca, copyBytes(dec)) + if err := decodeConsumerAssignmentConfig(&ca); err != nil { + return nil, err } - return &ca, nil } @@ -9176,6 +9140,14 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState, index uint64) ( qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name) mset.mu.Unlock() + // Always try to resume applies, we might be paused already if we timed out of processing the snapshot previously. + defer func() { + // Don't bother resuming if server or stream is gone. + if e != errCatchupStreamStopped && e != ErrServerNotRunning { + n.ResumeApply() + } + }() + // Bug that would cause this to be empty on stream update. if subject == _EMPTY_ { return errCatchupCorruptSnapshot @@ -9191,13 +9163,6 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState, index uint64) ( return err } - defer func() { - // Don't bother resuming if server or stream is gone. - if e != errCatchupStreamStopped && e != ErrServerNotRunning { - n.ResumeApply() - } - }() - // Set our catchup state. mset.setCatchingUp() defer mset.clearCatchingUp() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index c75b697cee3..512296a9d6a 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -9617,8 +9617,10 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas := getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) + nsa := &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeStreamTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) // Update a stream that's unsupported. sjs.mu.Lock() @@ -9639,8 +9641,10 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeStreamTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) // Deleting a stream should always work, even if it is unsupported. require_NoError(t, js.DeleteStream("DowngradeStreamTest")) @@ -9724,14 +9728,18 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") require_Len(t, len(wsas[0].Consumers), 2) for _, wca := range wsas[0].Consumers { - if wca.Config.Name == "DowngradeConsumerTest" { - require_Equal(t, wca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) + nca := &consumerAssignment{ConfigJSON: wca.ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + if nca.Config.Name == "DowngradeConsumerTest" { + require_Equal(t, nca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt-1)) } else { - require_Equal(t, wca.Config.Name, "consumer") + require_Equal(t, nca.Config.Name, "consumer") } } @@ -9754,14 +9762,18 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") require_Len(t, len(wsas[0].Consumers), 2) for _, wca := range wsas[0].Consumers { - if wca.Config.Name == "DowngradeConsumerTest" { - require_Equal(t, wca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + nca := &consumerAssignment{ConfigJSON: wca.ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + if nca.Config.Name == "DowngradeConsumerTest" { + require_Equal(t, nca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) } else { - require_Equal(t, wca.Config.Name, "consumer") + require_Equal(t, nca.Config.Name, "consumer") } } @@ -9771,10 +9783,14 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate(t *tes wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") require_Len(t, len(wsas[0].Consumers), 1) - require_Equal(t, wsas[0].Consumers[0].Config.Name, "consumer") + nca := &consumerAssignment{ConfigJSON: wsas[0].Consumers[0].ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + require_Equal(t, nca.Config.Name, "consumer") } func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { @@ -9892,7 +9908,9 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { wsas := getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") + nsa := &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeStreamTest") // Update a stream to be unsupported. sjs.mu.Lock() @@ -9913,7 +9931,9 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeStreamTest") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeStreamTest") // Deleting a stream should always work, even if it is unsupported. require_NoError(t, js.DeleteStream("DowngradeStreamTest")) @@ -9927,8 +9947,9 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { sjs.mu.Lock() ccfg := &ConsumerConfig{ - Name: "DowngradeConsumerTest", - Replicas: 3, + Name: "DowngradeConsumerTest", + Replicas: 3, + MaxWaiting: JSWaitQueueDefaultMax, } rg = cc.createGroupForConsumer(ccfg, sa) ca := &consumerAssignment{ @@ -9974,7 +9995,9 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") require_Len(t, len(wsas[0].Consumers), 1) // Update a consumer to be unsupported. @@ -9996,10 +10019,99 @@ func TestJetStreamClusterOfflineStreamAndConsumerAfterDowngrade(t *testing.T) { wsas = getValidMetaSnapshot() require_Len(t, len(wsas), 1) - require_Equal(t, wsas[0].Config.Name, "DowngradeConsumerTest") - require_Equal(t, wsas[0].Config.Metadata["_nats.req.level"], "0") + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeConsumerTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") require_Len(t, len(wsas[0].Consumers), 1) - require_Equal(t, wsas[0].Consumers[0].Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + nca := &consumerAssignment{ConfigJSON: wsas[0].Consumers[0].ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + require_Equal(t, nca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) +} + +func TestJetStreamClusterOfflineStreamAndConsumerUpdate(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "DowngradeTest", Replicas: 3}) + require_NoError(t, err) + _, err = js.AddConsumer("DowngradeTest", &nats.ConsumerConfig{Durable: "D", Replicas: 3}) + require_NoError(t, err) + c.waitOnAllCurrent() + + ml := c.leader() + require_NotNil(t, ml) + sjs := ml.getJetStream() + require_NotNil(t, sjs) + + var sa *streamAssignment + var ca *consumerAssignment + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + sjs.mu.Lock() + defer sjs.mu.Unlock() + sa = sjs.streamAssignment(globalAccountName, "DowngradeTest") + if sa == nil { + return errors.New("stream assignment missing") + } + ca = sjs.consumerAssignment(globalAccountName, "DowngradeTest", "D") + if ca == nil { + return errors.New("consumer assignment missing") + } + return nil + }) + + getValidMetaSnapshot := func() (wsas []writeableStreamAssignment) { + t.Helper() + snap, err := sjs.metaSnapshot() + require_NoError(t, err) + require_True(t, len(snap) > 0) + dec, err := s2.Decode(nil, snap) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(dec, &wsas)) + return wsas + } + + wsas := getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + nsa := &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], "0") + require_Len(t, len(wsas[0].Consumers), 1) + nca := &consumerAssignment{ConfigJSON: wsas[0].Consumers[0].ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + require_Equal(t, wsas[0].Consumers[0].Name, "D") + require_Equal(t, nca.Config.Metadata["_nats.req.level"], "0") + + // Update a consumer to be unsupported. + sjs.mu.Lock() + ca.Config.Metadata = map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt)} + err = sjs.cluster.meta.Propose(encodeAddConsumerAssignment(ca)) + sjs.mu.Unlock() + require_NoError(t, err) + + // Update the stream to be unsupported. + sjs.mu.Lock() + sa.Config.Metadata = map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt)} + err = sjs.cluster.meta.Propose(encodeUpdateStreamAssignment(sa)) + sjs.mu.Unlock() + require_NoError(t, err) + c.waitOnAllCurrent() + + wsas = getValidMetaSnapshot() + require_Len(t, len(wsas), 1) + nsa = &streamAssignment{ConfigJSON: wsas[0].ConfigJSON} + require_NoError(t, decodeStreamAssignmentConfig(ml, nsa)) + require_Equal(t, nsa.Config.Name, "DowngradeTest") + require_Equal(t, nsa.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) + require_Len(t, len(wsas[0].Consumers), 1) + nca = &consumerAssignment{ConfigJSON: wsas[0].Consumers[0].ConfigJSON} + require_NoError(t, decodeConsumerAssignmentConfig(nca)) + require_Equal(t, wsas[0].Consumers[0].Name, "D") + require_Equal(t, nca.Config.Metadata["_nats.req.level"], strconv.Itoa(math.MaxInt)) } func TestJetStreamClusterOfflineStreamAndConsumerStrictDecoding(t *testing.T) { @@ -10007,31 +10119,28 @@ func TestJetStreamClusterOfflineStreamAndConsumerStrictDecoding(t *testing.T) { defer s.Shutdown() unsupportedJson := []byte("{\"unknown\": true}") + unsupportedStreamJson := []byte(fmt.Sprintf("{\"stream\":%s}", unsupportedJson)) + unsupportedConsumerJson := []byte(fmt.Sprintf("{\"consumer\":%s}", unsupportedJson)) - sa, err := decodeStreamAssignment(s, unsupportedJson) + sa, err := decodeStreamAssignment(s, unsupportedStreamJson) require_NoError(t, err) - require_True(t, bytes.Equal(sa.unsupported.json, unsupportedJson)) + require_True(t, bytes.Equal(sa.ConfigJSON, unsupportedJson)) + require_True(t, sa.unsupported != nil) - ca, err := decodeConsumerAssignment(unsupportedJson) + ca, err := decodeConsumerAssignment(unsupportedConsumerJson) require_NoError(t, err) - require_True(t, bytes.Equal(ca.unsupported.json, unsupportedJson)) + require_True(t, bytes.Equal(ca.ConfigJSON, unsupportedJson)) + require_True(t, ca.unsupported != nil) var bb bytes.Buffer s2e := s2.NewWriter(&bb) - _, err = s2e.Write(unsupportedJson) + _, err = s2e.Write(unsupportedConsumerJson) require_NoError(t, err) require_NoError(t, s2e.Close()) ca, err = decodeConsumerAssignmentCompressed(bb.Bytes()) require_NoError(t, err) - require_True(t, bytes.Equal(ca.unsupported.json, unsupportedJson)) - - var wsa writeableStreamAssignment - require_NoError(t, wsa.UnmarshalJSON(unsupportedJson)) - require_True(t, bytes.Equal(wsa.unsupportedJson, unsupportedJson)) - - var wca writeableConsumerAssignment - require_NoError(t, wca.UnmarshalJSON(unsupportedJson)) - require_True(t, bytes.Equal(wca.unsupportedJson, unsupportedJson)) + require_True(t, bytes.Equal(ca.ConfigJSON, unsupportedJson)) + require_True(t, ca.unsupported != nil) } func TestJetStreamClusterStreamMonitorShutdownWithoutRaftNode(t *testing.T) { @@ -10194,6 +10303,33 @@ func TestJetStreamClusterPersistModeAsync(t *testing.T) { require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("async persist mode is not supported on replicated streams"))) } +func TestJetStreamClusterDeleteMsgEOF(t *testing.T) { + for _, replicas := range []int{1, 3} { + t.Run(fmt.Sprintf("R%d", replicas), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: replicas, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + require_Error(t, js.DeleteMsg("TEST", 0), NewJSNoMessageFoundError()) + require_NoError(t, js.DeleteMsg("TEST", 1)) + require_Error(t, js.DeleteMsg("TEST", 1), NewJSNoMessageFoundError()) + require_Error(t, js.DeleteMsg("TEST", 2), NewJSStreamMsgDeleteFailedError(ErrStoreEOF)) + }) + } +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 9928bd4cb38..cb7883d277d 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -9634,6 +9634,61 @@ func TestJetStreamConsumerDeliverAllNonOverlappingFilterSubjects(t *testing.T) { require_Equal(t, i.NumPending, 0) } +// https://github.com/nats-io/nats-server/issues/7336 +func TestJetStreamConsumerDeliverPartialOverlappingFilterSubjects(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnectNewAPI(t, s) + defer nc.Close() + + ctx := context.Background() + _, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"stream.>"}, + }) + require_NoError(t, err) + + publishMessageCount := 10 + for i := 0; i < publishMessageCount; i++ { + _, err = js.Publish(ctx, "stream.A", nil) + require_NoError(t, err) + } + + // Create consumer + consumer, err := js.CreateOrUpdateConsumer(ctx, "TEST", jetstream.ConsumerConfig{ + DeliverPolicy: jetstream.DeliverAllPolicy, + FilterSubjects: []string{ + "stream.A", + "stream.*.A", + }, + }) + require_NoError(t, err) + + messages := make(chan jetstream.Msg) + cc, err := consumer.Consume(func(msg jetstream.Msg) { + messages <- msg + msg.Ack() + }) + require_NoError(t, err) + defer cc.Drain() + + var count = 0 + for { + if count == publishMessageCount { + // All messages received. + return + } + select { + case <-messages: + count++ + case <-time.After(2 * time.Second): + t.Errorf("Timeout reached, %d messages received. Exiting.", count) + return + } + } +} + func TestJetStreamConsumerStateAlwaysFromStore(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index a1ca08cfd09..dbc7f0499d7 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -14,6 +14,9 @@ const ( // JSAtomicPublishIncompleteBatchErr atomic publish batch is incomplete JSAtomicPublishIncompleteBatchErr ErrorIdentifier = 10176 + // JSAtomicPublishInvalidBatchCommitErr atomic publish batch commit is invalid + JSAtomicPublishInvalidBatchCommitErr ErrorIdentifier = 10200 + // JSAtomicPublishInvalidBatchIDErr atomic publish batch ID is invalid JSAtomicPublishInvalidBatchIDErr ErrorIdentifier = 10179 @@ -605,6 +608,7 @@ var ( JSAccountResourcesExceededErr: {Code: 400, ErrCode: 10002, Description: "resource limits exceeded for account"}, JSAtomicPublishDisabledErr: {Code: 400, ErrCode: 10174, Description: "atomic publish is disabled"}, JSAtomicPublishIncompleteBatchErr: {Code: 400, ErrCode: 10176, Description: "atomic publish batch is incomplete"}, + JSAtomicPublishInvalidBatchCommitErr: {Code: 400, ErrCode: 10200, Description: "atomic publish batch commit is invalid"}, JSAtomicPublishInvalidBatchIDErr: {Code: 400, ErrCode: 10179, Description: "atomic publish batch ID is invalid"}, JSAtomicPublishMissingSeqErr: {Code: 400, ErrCode: 10175, Description: "atomic publish sequence is missing"}, JSAtomicPublishTooLargeBatchErrF: {Code: 400, ErrCode: 10199, Description: "atomic publish batch is too large: {size}"}, @@ -855,6 +859,16 @@ func NewJSAtomicPublishIncompleteBatchError(opts ...ErrorOption) *ApiError { return ApiErrors[JSAtomicPublishIncompleteBatchErr] } +// NewJSAtomicPublishInvalidBatchCommitError creates a new JSAtomicPublishInvalidBatchCommitErr error: "atomic publish batch commit is invalid" +func NewJSAtomicPublishInvalidBatchCommitError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSAtomicPublishInvalidBatchCommitErr] +} + // NewJSAtomicPublishInvalidBatchIDError creates a new JSAtomicPublishInvalidBatchIDErr error: "atomic publish batch ID is invalid" func NewJSAtomicPublishInvalidBatchIDError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 27ddcde2a98..cf1ba2211cb 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22020,3 +22020,237 @@ func TestJetStreamPersistModeAsync(t *testing.T) { _, err = jsStreamUpdate(t, nc, cfg) require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change persist mode"))) } + +func TestJetStreamRemoveTTLOnRemoveMsg(t *testing.T) { + for _, storageType := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} { + t.Run(storageType.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: storageType, + AllowMsgTTL: true, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil, nats.MsgTTL(time.Hour)) + require_NoError(t, err) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + + validateTTLCount := func(count uint64) { + store := mset.Store() + switch storageType { + case nats.FileStorage: + fs := store.(*fileStore) + fs.mu.RLock() + defer fs.mu.RUnlock() + require_Equal(t, fs.ttls.Count(), count) + case nats.MemoryStorage: + ms := store.(*memStore) + ms.mu.RLock() + defer ms.mu.RUnlock() + require_Equal(t, ms.ttls.Count(), count) + } + } + validateTTLCount(1) + + require_NoError(t, js.DeleteMsg("TEST", 1)) + validateTTLCount(0) + }) + } +} + +func TestJetStreamMessageTTLNotExpiring(t *testing.T) { + for _, storageType := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} { + t.Run(storageType.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: storageType, + AllowMsgTTL: true, + }) + require_NoError(t, err) + + // Triggers the expiry timer once, and needs to be reset to trigger earlier. + _, err = js.Publish("foo", nil, nats.MsgTTL(time.Hour)) + require_NoError(t, err) + + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + + // Storing messages with a TTL would continuously reset the timer. + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-time.After(100 * time.Millisecond): + ttl := time.Hour.Nanoseconds() + store := mset.Store() + store.StoreMsg("foo", nil, nil, ttl) + } + } + }() + + // The message should be expired timely. + pubAck, err := js.Publish("foo", nil, nats.MsgTTL(time.Second)) + require_NoError(t, err) + checkFor(t, 3*time.Second, 100*time.Millisecond, func() error { + _, err = js.GetMsg("TEST", pubAck.Sequence) + if err == nil { + return fmt.Errorf("message not removed yet") + } + if !errors.Is(err, nats.ErrMsgNotFound) { + return err + } + return nil + }) + }) + } +} + +func TestJetStreamScheduledMessageNotTriggering(t *testing.T) { + for _, storageType := range []StorageType{FileStorage, MemoryStorage} { + t.Run(storageType.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.>"}, + Storage: storageType, + AllowMsgSchedules: true, + }) + require_NoError(t, err) + + delay := func(d time.Duration) string { + return fmt.Sprintf("@at %s", time.Now().Add(d).Format(time.RFC3339Nano)) + } + + // Triggers the schedule timer once, and needs to be reset to trigger earlier. + m := nats.NewMsg("foo.schedule.first") + m.Header.Set("Nats-Schedule", delay(time.Hour)) + m.Header.Set("Nats-Schedule-Target", "foo.msg") + _, err = js.PublishMsg(m) + require_NoError(t, err) + + // Storing messages with a schedule would continuously reset the timer. + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer wg.Done() + var i int + for { + select { + case <-ctx.Done(): + return + case <-time.After(100 * time.Millisecond): + i++ + ms := nats.NewMsg(fmt.Sprintf("foo.schedule.%d", i)) + ms.Header.Set("Nats-Schedule", delay(time.Hour)) + ms.Header.Set("Nats-Schedule-Target", "foo.msg") + js.PublishMsg(ms) + } + } + }() + + // The message should be scheduled timely. + m = nats.NewMsg("foo.schedule.validate") + m.Header.Set("Nats-Schedule", delay(time.Second)) + m.Header.Set("Nats-Schedule-Target", "foo.msg") + _, err = js.PublishMsg(m) + require_NoError(t, err) + pubAck, err := js.PublishMsg(m) + require_NoError(t, err) + checkFor(t, 3*time.Second, 100*time.Millisecond, func() error { + _, err = js.GetMsg("TEST", pubAck.Sequence) + if err == nil { + return fmt.Errorf("message not removed yet") + } + if !errors.Is(err, nats.ErrMsgNotFound) { + return err + } + return nil + }) + }) + } +} + +func TestJetStreamScheduledMessageNotDeactivated(t *testing.T) { + for _, storageType := range []StorageType{FileStorage, MemoryStorage} { + t.Run(storageType.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.>"}, + Storage: storageType, + AllowMsgSchedules: true, + }) + require_NoError(t, err) + + delay := func(d time.Duration) string { + return fmt.Sprintf("@at %s", time.Now().Add(d).Format(time.RFC3339Nano)) + } + + // Message should be scheduled. + m := nats.NewMsg("foo.schedule") + m.Header.Set("Nats-Schedule", delay(time.Second)) + m.Header.Set("Nats-Schedule-Target", "foo.msg1") + _, err = js.PublishMsg(m) + require_NoError(t, err) + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + _, err = js.GetLastMsg("TEST", "foo.msg1") + if err != nil { + return err + } + return nil + }) + + // A message with a schedule is published. + m = nats.NewMsg("foo.schedule") + m.Header.Set("Nats-Schedule", delay(time.Second)) + m.Header.Set("Nats-Schedule-Target", "foo.msg2") + _, err = js.PublishMsg(m) + require_NoError(t, err) + + // But, a publish that is not a schedule should deactivate it. + _, err = js.Publish("foo.schedule", nil) + require_NoError(t, err) + + // Wait for some time, and confirm the schedule wasn't triggered. + time.Sleep(1500 * time.Millisecond) + _, err = js.GetLastMsg("TEST", "foo.msg2") + require_Error(t, err, nats.ErrMsgNotFound) + }) + } +} diff --git a/server/leafnode.go b/server/leafnode.go index c85bdfe173f..71491f77a55 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -76,7 +76,7 @@ type leaf struct { isSpoke bool // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster. remoteCluster string - // remoteServer holds onto the remove server's name or ID. + // remoteServer holds onto the remote server's name or ID. remoteServer string // domain name of remote server remoteDomain string @@ -2040,7 +2040,7 @@ func (s *Server) checkInternalSyncConsumers(acc *Account) { // We will check all streams in our local account. They must be a leader and // be sourcing or mirroring. We will check the external config on the stream itself // if this is cross domain, or if the remote domain is empty, meaning we might be - // extedning the system across this leafnode connection and hence we would be extending + // extending the system across this leafnode connection and hence we would be extending // our own domain. jsa := js.lookupAccount(acc) if jsa == nil { diff --git a/server/log.go b/server/log.go index 9baa31dc654..2cd294c457d 100644 --- a/server/log.go +++ b/server/log.go @@ -245,6 +245,10 @@ func (s *Server) RateLimitDebugf(format string, v ...any) { // Fatalf logs a fatal error func (s *Server) Fatalf(format string, v ...any) { + if s.isShuttingDown() { + s.Errorf(format, v) + return + } s.executeLogCall(func(logger Logger, format string, v ...any) { logger.Fatalf(format, v...) }, format, v...) diff --git a/server/memstore.go b/server/memstore.go index f75146c6ab3..c23ba638032 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -42,7 +42,9 @@ type memStore struct { scb StorageUpdateHandler rmcb StorageRemoveMsgHandler pmsgcb ProcessJetStreamMsgHandler - ageChk *time.Timer + ageChk *time.Timer // Timer to expire messages. + ageChkRun bool // Whether message expiration is currently running. + ageChkTime int64 // When the message expiration is scheduled to run. consumers int receivedAny bool ttls *thw.HashWheel @@ -113,6 +115,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { if ms.ageChk != nil && ms.cfg.MaxAge == 0 { ms.ageChk.Stop() ms.ageChk = nil + ms.ageChkTime = 0 } // Make sure to update MaxMsgsPer if cfg.MaxMsgsPer < -1 { @@ -309,6 +312,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, tt if ms.scheduling != nil { if schedule, ok := getMessageSchedule(hdr); ok && !schedule.IsZero() { ms.scheduling.add(seq, subj, schedule.UnixNano()) + } else { + ms.scheduling.removeSubject(subj) } } @@ -1062,6 +1067,12 @@ func (ms *memStore) startAgeChk() { // Lock should be held. func (ms *memStore) resetAgeChk(delta int64) { + // If we're already expiring messages, it will make sure to reset. + // Don't trigger again, as that could result in many expire goroutines. + if ms.ageChkRun { + return + } + var next int64 = math.MaxInt64 if ms.ttls != nil { next = ms.ttls.GetNextExpiration(next) @@ -1107,6 +1118,14 @@ func (ms *memStore) resetAgeChk(delta int64) { fireIn = 250 * time.Millisecond } + // If we want to kick the timer to run later than what was assigned before, don't reset it. + // Otherwise, we could get in a situation where the timer is continuously reset, and it never runs. + expires := ats.AccessTime() + fireIn.Nanoseconds() + if ms.ageChkTime > 0 && expires > ms.ageChkTime { + return + } + + ms.ageChkTime = expires if ms.ageChk != nil { ms.ageChk.Reset(fireIn) } else { @@ -1119,6 +1138,7 @@ func (ms *memStore) cancelAgeChk() { if ms.ageChk != nil { ms.ageChk.Stop() ms.ageChk = nil + ms.ageChkTime = 0 } } @@ -1126,17 +1146,22 @@ func (ms *memStore) cancelAgeChk() { func (ms *memStore) expireMsgs() { var smv StoreMsg var sm *StoreMsg - ms.mu.RLock() + ms.mu.Lock() maxAge := int64(ms.cfg.MaxAge) minAge := time.Now().UnixNano() - maxAge rmcb := ms.rmcb pmsgcb := ms.pmsgcb sdmTTL := int64(ms.cfg.SubjectDeleteMarkerTTL.Seconds()) sdmEnabled := sdmTTL > 0 - ms.mu.RUnlock() + + // If SDM is enabled, but handlers aren't set up yet. Try again later. if sdmEnabled && (rmcb == nil || pmsgcb == nil) { + ms.resetAgeChk(0) + ms.mu.Unlock() return } + ms.ageChkRun = true + ms.mu.Unlock() if maxAge > 0 { var seq uint64 @@ -1150,7 +1175,7 @@ func (ms *memStore) expireMsgs() { } if sdmEnabled { if last, ok := ms.shouldProcessSdm(seq, sm.subj); ok { - sdm := last && isSubjectDeleteMarker(sm.hdr) + sdm := last && !isSubjectDeleteMarker(sm.hdr) ms.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL) } } else { @@ -1168,27 +1193,13 @@ func (ms *memStore) expireMsgs() { // TODO: Not great that we're holding the lock here, but the timed hash wheel isn't thread-safe. nextTTL := int64(math.MaxInt64) - var rmSeqs []uint64 - var ttlSdm map[string][]SDMBySubj + var rmSeqs []thw.HashWheelEntry if ms.ttls != nil { ms.ttls.ExpireTasks(func(seq uint64, ts int64) bool { - if sdmEnabled { - // Need to grab subject for the specified sequence, and check - // if the message hasn't been removed in the meantime. - sm, _ = ms.loadMsgLocked(seq, &smv, false) - if sm != nil { - if ttlSdm == nil { - ttlSdm = make(map[string][]SDMBySubj, 1) - } - ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)}) - return false - } - } else { - // Collect sequences to remove. Don't remove messages inline here, - // as that releases the lock and THW is not thread-safe. - rmSeqs = append(rmSeqs, seq) - } - return true + rmSeqs = append(rmSeqs, thw.HashWheelEntry{Seq: seq, Expires: ts}) + // We might need to remove messages out of band, those can fail, and we can be shutdown halfway + // through so don't remove from THW just yet. + return false }) if maxAge > 0 { // Only check if we're expiring something in the next MaxAge interval, saves us a bit @@ -1200,34 +1211,46 @@ func (ms *memStore) expireMsgs() { } // Remove messages collected by THW. - for _, seq := range rmSeqs { - ms.removeMsg(seq, false) - } - - // THW is unordered, so must sort by sequence and must not be holding the lock. - if len(ttlSdm) > 0 { + if !sdmEnabled { + for _, rm := range rmSeqs { + ms.removeMsg(rm.Seq, false) + } + } else { + // THW is unordered, so must sort by sequence and must not be holding the lock. ms.mu.Unlock() - for subj, es := range ttlSdm { - slices.SortFunc(es, func(a, b SDMBySubj) int { - if a.seq == b.seq { - return 0 - } else if a.seq < b.seq { - return -1 - } else { - return 1 - } - }) - for _, e := range es { - if last, ok := ms.shouldProcessSdm(e.seq, subj); ok { - sdm := last && !e.sdm - ms.handleRemovalOrSdm(e.seq, subj, sdm, sdmTTL) - } + slices.SortFunc(rmSeqs, func(a, b thw.HashWheelEntry) int { + if a.Seq == b.Seq { + return 0 + } else if a.Seq < b.Seq { + return -1 + } else { + return 1 + } + }) + for _, rm := range rmSeqs { + // Need to grab subject for the specified sequence if for SDM, and check + // if the message hasn't been removed in the meantime. + // We need to grab the message and check if we should process SDM while holding the lock, + // otherwise we can race if a deletion of this message is in progress. + ms.mu.Lock() + sm, _ = ms.loadMsgLocked(rm.Seq, &smv, false) + if sm == nil { + ms.ttls.Remove(rm.Seq, rm.Expires) + ms.mu.Unlock() + continue + } + last, ok := ms.shouldProcessSdmLocked(rm.Seq, sm.subj) + ms.mu.Unlock() + if ok { + sdm := last && !isSubjectDeleteMarker(sm.hdr) + ms.handleRemovalOrSdm(rm.Seq, sm.subj, sdm, sdmTTL) } } ms.mu.Lock() } // Only cancel if no message left, not on potential lookup error that would result in sm == nil. + ms.ageChkRun, ms.ageChkTime = false, 0 if ms.state.Msgs == 0 && nextTTL == math.MaxInt64 { ms.cancelAgeChk() } else { @@ -1242,12 +1265,20 @@ func (ms *memStore) expireMsgs() { func (ms *memStore) shouldProcessSdm(seq uint64, subj string) (bool, bool) { ms.mu.Lock() defer ms.mu.Unlock() + return ms.shouldProcessSdmLocked(seq, subj) +} +// Lock should be held. +func (ms *memStore) shouldProcessSdmLocked(seq uint64, subj string) (bool, bool) { if ms.sdm == nil { ms.sdm = newSDMMeta() } if p, ok := ms.sdm.pending[seq]; ok { + // Don't allow more proposals for the same sequence if we already did recently. + if time.Since(time.Unix(0, p.ts)) < 2*time.Second { + return p.last, false + } // If we're about to use the cached value, and we knew it was last before, // quickly check that we don't have more remaining messages for the subject now. // Which means we are not the last anymore and must reset to not remove later data. @@ -1258,11 +1289,6 @@ func (ms *memStore) shouldProcessSdm(seq uint64, subj string) (bool, bool) { p.last = false } } - - // Don't allow more proposals for the same sequence if we already did recently. - if time.Since(time.Unix(0, p.ts)) < 2*time.Second { - return p.last, false - } ms.sdm.pending[seq] = SDMBySeq{p.last, time.Now().UnixNano()} return p.last, true } @@ -1302,9 +1328,15 @@ func (ms *memStore) runMsgScheduling() { ms.mu.Lock() defer ms.mu.Unlock() - if ms.scheduling == nil || ms.pmsgcb == nil { + // If scheduling is enabled, but handler isn't set up yet. Try again later. + if ms.scheduling == nil { + return + } + if ms.pmsgcb == nil { + ms.scheduling.resetTimer() return } + ms.scheduling.running = true scheduledMsgs := ms.scheduling.getScheduledMessages(func(seq uint64, smv *StoreMsg) *StoreMsg { sm, _ := ms.loadMsgLocked(seq, smv, false) @@ -1318,9 +1350,8 @@ func (ms *memStore) runMsgScheduling() { ms.mu.Lock() } - if ms.scheduling != nil { - ms.scheduling.resetTimer() - } + ms.scheduling.running, ms.scheduling.deadline = false, 0 + ms.scheduling.resetTimer() } // PurgeEx will remove messages based on subject filters, sequence and number of messages to keep. @@ -1933,6 +1964,15 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { ms.dmap.Insert(seq) ms.updateFirstSeq(seq) + // Remove any per subject tracking. + ms.removeSeqPerSubject(sm.subj, seq) + if ms.ttls != nil { + if ttl, err := getMessageTTL(sm.hdr); err == nil { + expires := time.Duration(sm.ts) + (time.Second * time.Duration(ttl)) + ms.ttls.Remove(seq, int64(expires)) + } + } + if secure { if len(sm.hdr) > 0 { sm.hdr = make([]byte, len(sm.hdr)) @@ -1945,9 +1985,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { sm.seq, sm.ts = 0, 0 } - // Remove any per subject tracking. - ms.removeSeqPerSubject(sm.subj, seq) - // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, seq) @@ -2056,6 +2093,7 @@ func (ms *memStore) Stop() error { if ms.ageChk != nil { ms.ageChk.Stop() ms.ageChk = nil + ms.ageChkTime = 0 } ms.msgs = nil ms.mu.Unlock() diff --git a/server/raft.go b/server/raft.go index 69f09f9c3ad..ee93a80142a 100644 --- a/server/raft.go +++ b/server/raft.go @@ -76,6 +76,7 @@ type RaftNode interface { ApplyQ() *ipQueue[*CommittedEntry] PauseApply() error ResumeApply() + DrainAndReplaySnapshot() bool LeadChangeC() <-chan bool QuitC() <-chan struct{} Created() time.Time @@ -1040,10 +1041,13 @@ func (n *raft) PauseApply() error { if n.State() == Leader { return errAlreadyLeader } - n.Lock() defer n.Unlock() + n.pauseApplyLocked() + return nil +} +func (n *raft) pauseApplyLocked() { // If we are currently a candidate make sure we step down. if n.State() == Candidate { n.stepdownLocked(noLeader) @@ -1051,11 +1055,11 @@ func (n *raft) PauseApply() error { n.debug("Pausing our apply channel") n.paused = true - n.hcommit = n.commit + if n.hcommit < n.commit { + n.hcommit = n.commit + } // Also prevent us from trying to become a leader while paused and catching up. n.resetElect(observerModeInterval) - - return nil } // ResumeApply will resume sending applies to the external apply queue. This @@ -1107,6 +1111,25 @@ func (n *raft) ResumeApply() { } } +// DrainAndReplaySnapshot will drain the apply queue and replay the snapshot. +// Our highest known commit will be preserved by pausing applies. The caller +// should make sure to call ResumeApply() when handling the snapshot from the +// queue, which will populate the rest of the committed entries in the queue. +func (n *raft) DrainAndReplaySnapshot() bool { + n.Lock() + defer n.Unlock() + n.warn("Draining and replaying snapshot") + snap, err := n.loadLastSnapshot() + if err != nil { + return false + } + n.pauseApplyLocked() + n.apply.drain() + n.commit = snap.lastIndex + n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}})) + return true +} + // Applied is a callback that must be called by the upper layer when it // has successfully applied the committed entries that it received from the // apply queue. It will return the number of entries and an estimation of the diff --git a/server/raft_test.go b/server/raft_test.go index ab704357e5c..27125e368de 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -3535,6 +3535,91 @@ func TestNRGSendAppendEntryNotLeader(t *testing.T) { require_True(t, msg == nil) } +func TestNRGDrainAndReplaySnapshot(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) + aeMsg4 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 4, pterm: 1, pindex: 4, entries: nil}) + + // Stage some entries as normal. + require_Len(t, n.apply.len(), 0) + n.processAppendEntry(aeMsg1, n.aesub) + n.processAppendEntry(aeMsg2, n.aesub) + n.processAppendEntry(aeMsg3, n.aesub) + n.processAppendEntry(aeHeartbeat1, n.aesub) + require_Equal(t, n.pindex, 3) + require_Len(t, n.apply.len(), 3) + require_Equal(t, n.commit, 3) + require_Equal(t, n.hcommit, 0) + + // Just a sanity-check, if we have no snapshot then this should fail. + require_False(t, n.DrainAndReplaySnapshot()) + require_Len(t, n.apply.len(), 3) + require_Equal(t, n.commit, 3) + require_Equal(t, n.hcommit, 0) + + // Simulate this server processing a snapshot that requires upper layer catchup. + // This catchup timed out and we would then call into DrainAndReplaySnapshot. + snap := []byte("snapshot") + n.Applied(1) + require_NoError(t, n.InstallSnapshot(snap)) + + require_True(t, n.DrainAndReplaySnapshot()) + require_True(t, n.paused) + require_Len(t, n.apply.len(), 1) + require_Equal(t, n.commit, 1) + require_Equal(t, n.hcommit, 3) + + // Simulate snapshot processing being successful and restoring the apply queue when resuming. + n.ResumeApply() + require_False(t, n.paused) + require_Len(t, n.apply.len(), 3) + require_Equal(t, n.commit, 3) + require_Equal(t, n.hcommit, 0) + + // Now simulate another case where the snapshot processing times out multiple times. + require_True(t, n.DrainAndReplaySnapshot()) + require_True(t, n.paused) + require_Len(t, n.apply.len(), 1) + require_Equal(t, n.commit, 1) + require_Equal(t, n.hcommit, 3) + + // Could receive new messages in the meantime and need to keep tracking the highest known commit properly. + n.processAppendEntry(aeMsg4, n.aesub) + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.pindex, 4) + require_True(t, n.paused) + require_Len(t, n.apply.len(), 1) + require_Equal(t, n.commit, 1) + require_Equal(t, n.hcommit, 4) + + // Replaying again should preserve the highest known commit. + require_True(t, n.DrainAndReplaySnapshot()) + require_True(t, n.paused) + require_Len(t, n.apply.len(), 1) + require_Equal(t, n.commit, 1) + require_Equal(t, n.hcommit, 4) + + // Resume applies, and ensure correct state. + n.ResumeApply() + require_False(t, n.paused) + require_Len(t, n.apply.len(), 4) + require_Equal(t, n.commit, 4) + require_Equal(t, n.hcommit, 0) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: diff --git a/server/scheduler.go b/server/scheduler.go index fc626471b15..d827c39451e 100644 --- a/server/scheduler.go +++ b/server/scheduler.go @@ -35,6 +35,8 @@ type MsgScheduling struct { run func() ttls *thw.HashWheel timer *time.Timer + running bool + deadline int64 schedules map[string]*MsgSchedule seqToSubj map[uint64]string inflight map[string]struct{} @@ -93,11 +95,25 @@ func (ms *MsgScheduling) remove(seq uint64) { } } +func (ms *MsgScheduling) removeSubject(subj string) { + if sched, ok := ms.schedules[subj]; ok { + ms.ttls.Remove(sched.seq, sched.ts) + delete(ms.schedules, subj) + delete(ms.seqToSubj, sched.seq) + } +} + func (ms *MsgScheduling) clearInflight() { ms.inflight = make(map[string]struct{}) } func (ms *MsgScheduling) resetTimer() { + // If we're already scheduling messages, it will make sure to reset. + // Don't trigger again, as that could result in many expire goroutines. + if ms.running { + return + } + next := ms.ttls.GetNextExpiration(math.MaxInt64) if next == math.MaxInt64 { clearTimer(&ms.timer) @@ -111,6 +127,14 @@ func (ms *MsgScheduling) resetTimer() { fireIn = 250 * time.Millisecond } + // If we want to kick the timer to run later than what was assigned before, don't reset it. + // Otherwise, we could get in a situation where the timer is continuously reset, and it never runs. + deadline := time.Now().UnixNano() + fireIn.Nanoseconds() + if ms.deadline > 0 && deadline > ms.deadline { + return + } + + ms.deadline = deadline if ms.timer != nil { ms.timer.Reset(fireIn) } else { diff --git a/server/sdm.go b/server/sdm.go index 88a1be4e494..dfb2a75be09 100644 --- a/server/sdm.go +++ b/server/sdm.go @@ -30,12 +30,6 @@ type SDMBySeq struct { ts int64 // Last timestamp we proposed a removal/sdm. } -// SDMBySubj holds whether a message for a specific subject and sequence was a subject delete marker or not. -type SDMBySubj struct { - seq uint64 - sdm bool -} - func newSDMMeta() *SDMMeta { return &SDMMeta{ totals: make(map[string]uint64, 1), @@ -46,7 +40,7 @@ func newSDMMeta() *SDMMeta { // isSubjectDeleteMarker returns whether the headers indicate this message is a subject delete marker. // Either it's a usual marker with JSMarkerReason, or it's a KV Purge marker as the KVOperation. func isSubjectDeleteMarker(hdr []byte) bool { - return len(sliceHeader(JSMarkerReason, hdr)) == 0 && !bytes.Equal(sliceHeader(KVOperation, hdr), KVOperationValuePurge) + return len(sliceHeader(JSMarkerReason, hdr)) != 0 || bytes.Equal(sliceHeader(KVOperation, hdr), KVOperationValuePurge) } // empty clears all data. diff --git a/server/stream.go b/server/stream.go index 7dbc7d2b4e3..1bf64daba42 100644 --- a/server/stream.go +++ b/server/stream.go @@ -6264,7 +6264,6 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr } return err } - commit := len(sliceHeader(JSBatchCommit, hdr)) != 0 mset.mu.Lock() if mset.batches == nil { @@ -6339,6 +6338,37 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr batches.group[batchId] = b } + var commit bool + if c := sliceHeader(JSBatchCommit, hdr); c != nil { + // Reject the batch if the commit is not recognized. + if !bytes.Equal(c, []byte("1")) { + b.cleanupLocked(batchId, batches) + batches.mu.Unlock() + err := NewJSAtomicPublishInvalidBatchCommitError() + if canRespond { + b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err}) + outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, b, nil, 0)) + } + return err + } + commit = true + } + + // The required API level can have the batch be rejected. But the header is always removed. + if len(sliceHeader(JSRequiredApiLevel, hdr)) != 0 { + if errorOnRequiredApiLevel(hdr) { + b.cleanupLocked(batchId, batches) + batches.mu.Unlock() + err := NewJSRequiredApiLevelError() + if canRespond { + b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: err}) + outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, b, nil, 0)) + } + return err + } + hdr = removeHeaderIfPresent(hdr, JSRequiredApiLevel) + } + // Detect gaps. b.lseq++ if b.lseq != batchSeq { @@ -6482,7 +6512,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr return errorOnUnsupported(seq, JSExpectedLastMsgId) } - if bhdr, bmsg, _, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, bhdr, bmsg, false, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil { + if bhdr, bmsg, _, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, bsubj, bhdr, bmsg, false, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil { rollback(seq) b.cleanupLocked(batchId, batches) batches.mu.Unlock() diff --git a/server/sublist.go b/server/sublist.go index 2bed802c180..f759cb084fb 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -995,6 +995,9 @@ func (n *node) isEmpty() bool { // Return the number of nodes for the given level. func (l *level) numNodes() int { + if l == nil { + return 0 + } num := len(l.nodes) if l.pwc != nil { num++ @@ -1758,39 +1761,49 @@ func intersectStree[T any](st *stree.SubjectTree[T], r *level, subj []byte, cb f if len(nsubj) > 0 { nsubj = append(subj, '.') } - switch { - case r.fwc != nil: + if r.fwc != nil { // We've reached a full wildcard, do a FWC match on the stree at this point // and don't keep iterating downward. nsubj := append(nsubj, '>') st.Match(nsubj, cb) - case r.pwc != nil: + return + } + if r.pwc != nil { // We've found a partial wildcard. We'll keep iterating downwards, but first // check whether there's interest at this level (without triggering dupes) and // match if so. + var done bool nsubj := append(nsubj, '*') if len(r.pwc.psubs)+len(r.pwc.qsubs) > 0 { st.Match(nsubj, cb) + done = true } - if r.pwc.next != nil && r.pwc.next.numNodes() > 0 { + if r.pwc.next.numNodes() > 0 { intersectStree(st, r.pwc.next, nsubj, cb) } - default: - // Normal node with subject literals, keep iterating. - for t, n := range r.nodes { - nsubj := append(nsubj, t...) - if len(n.psubs)+len(n.qsubs) > 0 { - if subjectHasWildcard(bytesToString(nsubj)) { - st.Match(nsubj, cb) - } else { - if e, ok := st.Find(nsubj); ok { - cb(nsubj, e) - } + if done { + return + } + } + // Normal node with subject literals, keep iterating. + for t, n := range r.nodes { + if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 { + // A wildcard at the next level will already visit these descendents + // so skip so we don't callback the same subject more than once. + continue + } + nsubj := append(nsubj, t...) + if len(n.psubs)+len(n.qsubs) > 0 { + if subjectHasWildcard(bytesToString(nsubj)) { + st.Match(nsubj, cb) + } else { + if e, ok := st.Find(nsubj); ok { + cb(nsubj, e) } } - if n.next != nil && n.next.numNodes() > 0 { - intersectStree(st, n.next, nsubj, cb) - } + } + if n.next.numNodes() > 0 { + intersectStree(st, n.next, nsubj, cb) } } } diff --git a/server/sublist_test.go b/server/sublist_test.go index acc5b53af25..e1eaf23d432 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -2086,6 +2086,35 @@ func TestSublistInterestBasedIntersection(t *testing.T) { require_NoDuplicates(t, got) }) + t.Run("PWCExtended", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("stream.*.child")) + sl.Insert(newSub("stream.A")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 2) + require_NoDuplicates(t, got) + }) + + t.Run("PWCExtendedAggressive", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("stream.A.child")) + sl.Insert(newSub("*.A.child")) + sl.Insert(newSub("stream.*.child")) + sl.Insert(newSub("stream.A.*")) + sl.Insert(newSub("stream.*.*")) + sl.Insert(newSub("*.A.*")) + sl.Insert(newSub("*.*.child")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 1) + require_NoDuplicates(t, got) + }) + t.Run("FWCAll", func(t *testing.T) { got := map[string]int{} sl := NewSublistNoCache() diff --git a/server/thw/thw.go b/server/thw/thw.go index 712783312b7..bcbc16e9a3a 100644 --- a/server/thw/thw.go +++ b/server/thw/thw.go @@ -48,6 +48,12 @@ type HashWheel struct { count uint64 // How many entries are present? } +// HashWheelEntry represents a single entry in the wheel. +type HashWheelEntry struct { + Seq uint64 + Expires int64 +} + // NewHashWheel initializes a new HashWheel. func NewHashWheel() *HashWheel { return &HashWheel{ @@ -61,17 +67,6 @@ func (hw *HashWheel) getPosition(expires int64) int64 { return (expires / tickDuration) & wheelMask } -// updateLowestExpires finds the new lowest expiration time across all slots. -func (hw *HashWheel) updateLowestExpires() { - lowest := int64(math.MaxInt64) - for _, s := range hw.wheel { - if s != nil && s.lowest < lowest { - lowest = s.lowest - } - } - hw.lowest = lowest -} - // newSlot creates a new slot. func newSlot() *slot { return &slot{ @@ -120,22 +115,7 @@ func (hw *HashWheel) Remove(seq uint64, expires int64) error { // If the slot is empty, we can set it to nil to free memory. if len(s.entries) == 0 { hw.wheel[pos] = nil - } else if expires == s.lowest { - // Find new lowest in this slot. - lowest := int64(math.MaxInt64) - for _, exp := range s.entries { - if exp < lowest { - lowest = exp - } - } - s.lowest = lowest } - - // If we removed the global lowest, find the new one. - if expires == hw.lowest { - hw.updateLowestExpires() - } - return nil }