From c999177e1d53f47e4840923b72ba49ee2c62110d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 26 Oct 2023 19:15:35 -0700 Subject: [PATCH 1/2] If we have deliver last by subject and max msgs per subject of 1, we can short circuit to normal consumer. Signed-off-by: Derek Collison --- server/consumer.go | 70 +++++++++++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 32 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 3b7fe4dd63b..1f96e772e8c 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4687,44 +4687,50 @@ func (o *consumer) selectStartingSeqNo() { } } } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { - // A threshold for when we switch from get last msg to subjects state. - const numSubjectsThresh = 256 - lss := &lastSeqSkipList{resume: state.LastSeq} - var filters []string - if o.subjf == nil { - filters = append(filters, o.cfg.FilterSubject) + // If our parent stream is set to max msgs per subject of 1 this is just + // a normal consumer at this point. We can avoid any heavy lifting. + if o.mset.cfg.MaxMsgsPer == 1 { + o.sseq = state.FirstSeq } else { - for _, filter := range o.subjf { - filters = append(filters, filter.subject) + // A threshold for when we switch from get last msg to subjects state. + const numSubjectsThresh = 256 + lss := &lastSeqSkipList{resume: state.LastSeq} + var filters []string + if o.subjf == nil { + filters = append(filters, o.cfg.FilterSubject) + } else { + for _, filter := range o.subjf { + filters = append(filters, filter.subject) + } } - } - for _, filter := range filters { - if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh { - var smv StoreMsg - for subj := range st { - if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil { - lss.seqs = append(lss.seqs, sm.seq) + for _, filter := range filters { + if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh { + var smv StoreMsg + for subj := range st { + if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil { + lss.seqs = append(lss.seqs, sm.seq) + } + } + } else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 { + for _, ss := range mss { + lss.seqs = append(lss.seqs, ss.Last) } - } - } else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 { - for _, ss := range mss { - lss.seqs = append(lss.seqs, ss.Last) } } + // Sort the skip list if needed. + if len(lss.seqs) > 1 { + sort.Slice(lss.seqs, func(i, j int) bool { + return lss.seqs[j] > lss.seqs[i] + }) + } + if len(lss.seqs) == 0 { + o.sseq = state.LastSeq + } else { + o.sseq = lss.seqs[0] + } + // Assign skip list. + o.lss = lss } - // Sort the skip list if needed. - if len(lss.seqs) > 1 { - sort.Slice(lss.seqs, func(i, j int) bool { - return lss.seqs[j] > lss.seqs[i] - }) - } - if len(lss.seqs) == 0 { - o.sseq = state.LastSeq - } else { - o.sseq = lss.seqs[0] - } - // Assign skip list. - o.lss = lss } else if o.cfg.OptStartTime != nil { // If we are here we are time based. // TODO(dlc) - Once clustered can't rely on this. From 85784a384b4c47c66306c044ccfb182611de4231 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 26 Oct 2023 19:27:41 -0700 Subject: [PATCH 2/2] Only load msgs if mb.fss has been evicted. Signed-off-by: Derek Collison --- server/filestore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 4fcbea4d759..3157e6784f4 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2426,7 +2426,7 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { fs.mu.RLock() defer fs.mu.RUnlock() - if fs.state.Msgs == 0 { + if fs.state.Msgs == 0 || fs.noTrackSubjects() { return nil } @@ -2454,7 +2454,7 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { mb.mu.Lock() var shouldExpire bool - if mb.cacheNotLoaded() { + if mb.fss == nil { // Make sure we have fss loaded. mb.loadMsgsWithLock() shouldExpire = true