Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 68 additions & 43 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,10 @@ func (o *consumer) setLeader(isLeader bool) {
} else if o.srv.gateway.enabled {
stopAndClearTimer(&o.gwdtmr)
}
// If we were the leader make sure to drain queued up acks.
if wasLeader {
o.ackMsgs.drain()
}
o.mu.Unlock()

// Unregister as a leader with our parent stream.
Expand Down Expand Up @@ -2534,19 +2538,18 @@ func (o *consumer) applyState(state *ConsumerState) {
return
}

// If o.sseq is greater don't update. Don't go backwards on o.sseq.
if o.sseq <= state.Delivered.Stream {
// If o.sseq is greater don't update. Don't go backwards on o.sseq if leader.
if !o.isLeader() || o.sseq <= state.Delivered.Stream {
o.sseq = state.Delivered.Stream + 1
}
o.dseq = state.Delivered.Consumer + 1

o.adflr = state.AckFloor.Consumer
o.asflr = state.AckFloor.Stream
o.pending = state.Pending
o.rdc = state.Redelivered

// Setup tracking timer if we have restored pending.
if len(o.pending) > 0 {
if o.isLeader() && len(o.pending) > 0 {
// This is on startup or leader change. We want to check pending
// sooner in case there are inconsistencies etc. Pick between 500ms - 1.5s
delay := 500*time.Millisecond + time.Duration(rand.Int63n(1000))*time.Millisecond
Expand Down Expand Up @@ -2788,6 +2791,12 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
return
}

mset := o.mset
if mset == nil || mset.closed.Load() {
o.mu.Unlock()
return
}

var sagap uint64
var needSignal bool

Expand All @@ -2803,17 +2812,20 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
delete(o.pending, sseq)
// Use the original deliver sequence from our pending record.
dseq = p.Sequence
}
if len(o.pending) == 0 {
o.adflr, o.asflr = o.dseq-1, o.sseq-1
} else if dseq == o.adflr+1 {
o.adflr, o.asflr = dseq, sseq
for ss := sseq + 1; ss < o.sseq; ss++ {
if p, ok := o.pending[ss]; ok {
if p.Sequence > 0 {
o.adflr, o.asflr = p.Sequence-1, ss-1
// Only move floors if we matched an existing pending.
if dseq == o.adflr+1 {
o.adflr, o.asflr = dseq, sseq
for ss := sseq + 1; ss < o.sseq; ss++ {
if p, ok := o.pending[ss]; ok {
if p.Sequence > 0 {
o.adflr, o.asflr = p.Sequence-1, ss-1
}
break
}
break
}
// If nothing left set to current delivered.
if len(o.pending) == 0 {
o.adflr, o.asflr = o.dseq-1, o.sseq-1
}
}
}
Expand Down Expand Up @@ -2845,7 +2857,6 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
// Update underlying store.
o.updateAcks(dseq, sseq)

mset := o.mset
clustered := o.node != nil

// In case retention changes for a stream, this ought to have been updated
Expand Down Expand Up @@ -3092,10 +3103,16 @@ func (wq *waitQueue) add(wr *waitingRequest) error {
}

func (wq *waitQueue) isFull() bool {
if wq == nil {
return false
}
return wq.n == wq.max
}

func (wq *waitQueue) isEmpty() bool {
if wq == nil {
return true
}
return wq.n == 0
}

Expand Down Expand Up @@ -3627,7 +3644,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
break
}
}

}

// Don't sort the o.subjf if it's only one entry
Expand Down Expand Up @@ -4034,8 +4050,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
pmsg, dc, err = o.getNextMsg()

// We can release the lock now under getNextMsg so need to check this condition again here.
// consumer is closed when mset is set to nil.
if o.mset == nil {
if o.closed || o.mset == nil {
o.mu.Unlock()
return
}
Expand Down Expand Up @@ -4286,50 +4301,55 @@ func (o *consumer) streamNumPending() uint64 {
o.npc, o.npf = 0, 0
return 0
}
npc, npf := o.calculateNumPending()
o.npc, o.npf = int64(npc), npf
return o.numPending()
}

// Will calculate num pending but only requires a read lock.
// Depends on delivery policy, for last per subject we calculate differently.
// At least RLock should be held.
func (o *consumer) calculateNumPending() (npc, npf uint64) {
if o.mset == nil || o.mset.store == nil {
return 0, 0
}

isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject

// Deliver Last Per Subject calculates num pending differently.
if isLastPerSubject {
o.npc, o.npf = 0, 0
// Consumer without filters.
if o.subjf == nil {
npc, npf := o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject)
o.npc, o.npf = int64(npc), npf
return o.numPending()
return o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject)
}
// Consumer with filters.
for _, filter := range o.subjf {
npc, npf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject)
o.npc += int64(npc)
if npf > o.npf {
o.npf = npf // Always last
lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject)
npc += lnpc
if lnpf > npf {
npf = lnpf // Always last
}
}
return o.numPending()
return npc, npf
}
// Every other Delivery Policy is handled here.
// Consumer without filters.
if o.subjf == nil {
npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject)
o.npc, o.npf = int64(npc), npf
return o.numPending()
return o.mset.store.NumPending(o.sseq, _EMPTY_, false)
}
// Consumer with filters.
o.npc, o.npf = 0, 0
for _, filter := range o.subjf {
// We might loose state of o.subjf, so if we do recover from o.sseq
if filter.currentSeq < o.sseq {
filter.currentSeq = o.sseq
}
npc, npf := o.mset.store.NumPending(filter.currentSeq, filter.subject, isLastPerSubject)
o.npc += int64(npc)
if npf > o.npf {
o.npf = npf // Always last
lnpc, lnpf := o.mset.store.NumPending(filter.currentSeq, filter.subject, false)
npc += lnpc
if lnpf > npf {
npf = lnpf // Always last
}
}

return o.numPending()
return npc, npf
}

func convertToHeadersOnly(pmsg *jsPubMsg) {
Expand Down Expand Up @@ -4381,6 +4401,10 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,

// Cant touch pmsg after this sending so capture what we need.
seq, ts := pmsg.seq, pmsg.ts

// Update delivered first.
o.updateDelivered(dseq, seq, dc, ts)

// Send message.
o.outq.send(pmsg)

Expand All @@ -4401,9 +4425,6 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
o.waiting.last = time.Now()
}

// FIXME(dlc) - Capture errors?
o.updateDelivered(dseq, seq, dc, ts)

// If we are ack none and mset is interest only we should make sure stream removes interest.
if ap == AckNone && rp != LimitsPolicy {
if o.node == nil || o.cfg.Direct {
Expand Down Expand Up @@ -4646,7 +4667,11 @@ func (o *consumer) checkPending() {
check := len(o.pending) > 1024
for seq, p := range o.pending {
if check && atomic.LoadInt64(&o.awl) > 0 {
o.ptmr.Reset(100 * time.Millisecond)
if o.ptmr == nil {
o.ptmr = time.AfterFunc(100*time.Millisecond, o.checkPending)
} else {
o.ptmr.Reset(100 * time.Millisecond)
}
return
}
// Check if these are no longer valid.
Expand Down Expand Up @@ -5391,7 +5416,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {

// If it was pending process it like an ack.
if wasPending {
// We could have lock for stream so do this in a go routine.
// We could have the lock for the stream so do this in a go routine.
// TODO(dlc) - We should do this with ipq vs naked go routines.
go o.processTerm(sseq, p.Sequence, rdc, ackTermUnackedLimitsReason)
}
Expand Down Expand Up @@ -5549,7 +5574,7 @@ func (o *consumer) checkStateForInterestStream() error {
return errAckFloorHigherThanLastSeq
}

for seq := ss.FirstSeq; seq <= asflr; seq++ {
for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ {
mset.ackMsg(o, seq)
}

Expand Down
8 changes: 2 additions & 6 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,10 +1410,6 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
if !cfg.Created.IsZero() {
obs.setCreatedTime(cfg.Created)
}
lseq := e.mset.lastSeq()
obs.mu.Lock()
err = obs.readStoredState(lseq)
obs.mu.Unlock()
if err != nil {
s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err)
}
Expand Down Expand Up @@ -2290,8 +2286,8 @@ func (jsa *jsAccount) delete() {
jsa.templates = nil
jsa.mu.Unlock()

for _, ms := range streams {
ms.stop(false, false)
for _, mset := range streams {
mset.stop(false, false)
}

for _, t := range ts {
Expand Down
12 changes: 11 additions & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1946,7 +1946,17 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
return
}
}
config := mset.config()

mset.mu.RLock()
config := mset.cfg
checkAcks := config.Retention != LimitsPolicy && mset.hasLimitsSet()
mset.mu.RUnlock()

// Check if we are a clustered interest retention stream with limits.
// If so, check ack floors against our state.
if checkAcks {
mset.checkInterestState()
}

resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
Expand Down
7 changes: 7 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8471,6 +8471,7 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
mset.mu.RLock()
sysc, js, sa, config := mset.sysc, mset.srv.js.Load(), mset.sa, mset.cfg
isLeader := mset.isLeader()
checkAcks := isLeader && config.Retention != LimitsPolicy && mset.isClustered() && mset.hasLimitsSet()
mset.mu.RUnlock()

// By design all members will receive this. Normally we only want the leader answering.
Expand All @@ -8484,6 +8485,12 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
time.Sleep(500 * time.Millisecond)
}

// Check if we are a clustered interest retention stream with limits.
// If so, check ack floors against our state.
if checkAcks {
mset.checkInterestState()
}

si := &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Expand Down
Loading