Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 10 additions & 32 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// If we have multiple filter subjects, create a sublist which we will use
// in calling store.LoadNextMsgMulti.
if len(o.cfg.FilterSubjects) > 0 {
o.filters = NewSublistWithCache()
o.filters = NewSublistNoCache()
for _, filter := range o.cfg.FilterSubjects {
o.filters.Insert(&subscription{subject: []byte(filter)})
}
Expand Down Expand Up @@ -2202,7 +2202,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
if len(o.subjf) == 1 {
o.filters = nil
} else {
o.filters = NewSublistWithCache()
o.filters = NewSublistNoCache()
for _, filter := range o.subjf {
o.filters.Insert(&subscription{subject: []byte(filter.subject)})
}
Expand Down Expand Up @@ -4061,7 +4061,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
// Check if we are multi-filtered or not.
if filters != nil {
sm, sseq, err = store.LoadNextMsgMulti(filters, fseq, &pmsg.StoreMsg)
} else if subjf != nil { // Means single filtered subject since o.filters means > 1.
} else if len(subjf) > 0 { // Means single filtered subject since o.filters means > 1.
filter, wc := subjf[0].subject, subjf[0].hasWildcard
sm, sseq, err = store.LoadNextMsg(filter, wc, fseq, &pmsg.StoreMsg)
} else {
Expand Down Expand Up @@ -4730,37 +4730,15 @@ func (o *consumer) calculateNumPending() (npc, npf uint64) {
}

isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject
filters, subjf := o.filters, o.subjf

// Deliver Last Per Subject calculates num pending differently.
if isLastPerSubject {
// Consumer without filters.
if o.subjf == nil {
return o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject)
}
// Consumer with filters.
for _, filter := range o.subjf {
lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject)
npc += lnpc
if lnpf > npf {
npf = lnpf // Always last
}
}
return npc, npf
}
// Every other Delivery Policy is handled here.
// Consumer without filters.
if o.subjf == nil {
return o.mset.store.NumPending(o.sseq, _EMPTY_, false)
}
// Consumer with filters.
for _, filter := range o.subjf {
lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, false)
npc += lnpc
if lnpf > npf {
npf = lnpf // Always last
}
if filters != nil {
return o.mset.store.NumPendingMulti(o.sseq, filters, isLastPerSubject)
} else if len(subjf) > 0 {
filter := subjf[0].subject
Comment thread
derekcollison marked this conversation as resolved.
return o.mset.store.NumPending(o.sseq, filter, isLastPerSubject)
}
return npc, npf
return o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject)
}

func convertToHeadersOnly(pmsg *jsPubMsg) {
Expand Down
Loading