From e0ae88c5f07defa2357de410c3d1287ab9042013 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 26 Oct 2023 17:39:27 -0700 Subject: [PATCH] Make sure if we have lots of subjects to fallback to subjects state vs get last. Signed-off-by: Derek Collison --- server/consumer.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index f53c7726bce..3b7fe4dd63b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4687,8 +4687,9 @@ func (o *consumer) selectStartingSeqNo() { } } } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { + // A threshold for when we switch from get last msg to subjects state. + const numSubjectsThresh = 256 lss := &lastSeqSkipList{resume: state.LastSeq} - var smv StoreMsg var filters []string if o.subjf == nil { filters = append(filters, o.cfg.FilterSubject) @@ -4698,9 +4699,16 @@ func (o *consumer) selectStartingSeqNo() { } } for _, filter := range filters { - for subj := range o.mset.store.SubjectsTotals(filter) { - if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil { - lss.seqs = append(lss.seqs, sm.seq) + if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh { + var smv StoreMsg + for subj := range st { + if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil { + lss.seqs = append(lss.seqs, sm.seq) + } + } + } else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 { + for _, ss := range mss { + lss.seqs = append(lss.seqs, ss.Last) } } }