Skip to content

Commit

Permalink
Make sure if we have lots of subjects to fallback to subjects state (#…
Browse files Browse the repository at this point in the history
…4713)

This is optimization for last change, the get last works but not if
number of subjects is very large, for that fall back to subjects state.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Oct 27, 2023
2 parents 51b6a8e + e0ae88c commit 9edecc8
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4687,8 +4687,9 @@ func (o *consumer) selectStartingSeqNo() {
}
}
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
// A threshold for when we switch from get last msg to subjects state.
const numSubjectsThresh = 256
lss := &lastSeqSkipList{resume: state.LastSeq}
var smv StoreMsg
var filters []string
if o.subjf == nil {
filters = append(filters, o.cfg.FilterSubject)
Expand All @@ -4698,9 +4699,16 @@ func (o *consumer) selectStartingSeqNo() {
}
}
for _, filter := range filters {
for subj := range o.mset.store.SubjectsTotals(filter) {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh {
var smv StoreMsg
for subj := range st {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
}
}
} else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 {
for _, ss := range mss {
lss.seqs = append(lss.seqs, ss.Last)
}
}
}
Expand Down

0 comments on commit 9edecc8

Please sign in to comment.