diff --git a/server/consumer.go b/server/consumer.go index f53c7726bce..3b7fe4dd63b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4687,8 +4687,9 @@ func (o *consumer) selectStartingSeqNo() { } } } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { + // A threshold for when we switch from get last msg to subjects state. + const numSubjectsThresh = 256 lss := &lastSeqSkipList{resume: state.LastSeq} - var smv StoreMsg var filters []string if o.subjf == nil { filters = append(filters, o.cfg.FilterSubject) @@ -4698,9 +4699,16 @@ func (o *consumer) selectStartingSeqNo() { } } for _, filter := range filters { - for subj := range o.mset.store.SubjectsTotals(filter) { - if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil { - lss.seqs = append(lss.seqs, sm.seq) + if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh { + var smv StoreMsg + for subj := range st { + if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil { + lss.seqs = append(lss.seqs, sm.seq) + } + } + } else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 { + for _, ss := range mss { + lss.seqs = append(lss.seqs, ss.Last) } } }