diff --git a/server/consumer.go b/server/consumer.go index 0c01da94f07..ec98c8ae92c 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1392,6 +1392,10 @@ func (o *consumer) setLeader(isLeader bool) { } else if o.srv.gateway.enabled { stopAndClearTimer(&o.gwdtmr) } + // If we were the leader make sure to drain queued up acks. + if wasLeader { + o.ackMsgs.drain() + } o.mu.Unlock() // Unregister as a leader with our parent stream. @@ -2534,19 +2538,18 @@ func (o *consumer) applyState(state *ConsumerState) { return } - // If o.sseq is greater don't update. Don't go backwards on o.sseq. - if o.sseq <= state.Delivered.Stream { + // If o.sseq is greater don't update. Don't go backwards on o.sseq if leader. + if !o.isLeader() || o.sseq <= state.Delivered.Stream { o.sseq = state.Delivered.Stream + 1 } o.dseq = state.Delivered.Consumer + 1 - o.adflr = state.AckFloor.Consumer o.asflr = state.AckFloor.Stream o.pending = state.Pending o.rdc = state.Redelivered // Setup tracking timer if we have restored pending. - if len(o.pending) > 0 { + if o.isLeader() && len(o.pending) > 0 { // This is on startup or leader change. We want to check pending // sooner in case there are inconsistencies etc. Pick between 500ms - 1.5s delay := 500*time.Millisecond + time.Duration(rand.Int63n(1000))*time.Millisecond @@ -2788,6 +2791,12 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { return } + mset := o.mset + if mset == nil || mset.closed.Load() { + o.mu.Unlock() + return + } + var sagap uint64 var needSignal bool @@ -2803,17 +2812,20 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { delete(o.pending, sseq) // Use the original deliver sequence from our pending record. dseq = p.Sequence - } - if len(o.pending) == 0 { - o.adflr, o.asflr = o.dseq-1, o.sseq-1 - } else if dseq == o.adflr+1 { - o.adflr, o.asflr = dseq, sseq - for ss := sseq + 1; ss < o.sseq; ss++ { - if p, ok := o.pending[ss]; ok { - if p.Sequence > 0 { - o.adflr, o.asflr = p.Sequence-1, ss-1 + // Only move floors if we matched an existing pending. + if dseq == o.adflr+1 { + o.adflr, o.asflr = dseq, sseq + for ss := sseq + 1; ss < o.sseq; ss++ { + if p, ok := o.pending[ss]; ok { + if p.Sequence > 0 { + o.adflr, o.asflr = p.Sequence-1, ss-1 + } + break } - break + } + // If nothing left set to current delivered. + if len(o.pending) == 0 { + o.adflr, o.asflr = o.dseq-1, o.sseq-1 } } } @@ -2845,7 +2857,6 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { // Update underlying store. o.updateAcks(dseq, sseq) - mset := o.mset clustered := o.node != nil // In case retention changes for a stream, this ought to have been updated @@ -3092,10 +3103,16 @@ func (wq *waitQueue) add(wr *waitingRequest) error { } func (wq *waitQueue) isFull() bool { + if wq == nil { + return false + } return wq.n == wq.max } func (wq *waitQueue) isEmpty() bool { + if wq == nil { + return true + } return wq.n == 0 } @@ -3627,7 +3644,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { break } } - } // Don't sort the o.subjf if it's only one entry @@ -4034,8 +4050,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { pmsg, dc, err = o.getNextMsg() // We can release the lock now under getNextMsg so need to check this condition again here. - // consumer is closed when mset is set to nil. - if o.mset == nil { + if o.closed || o.mset == nil { o.mu.Unlock() return } @@ -4286,50 +4301,55 @@ func (o *consumer) streamNumPending() uint64 { o.npc, o.npf = 0, 0 return 0 } + npc, npf := o.calculateNumPending() + o.npc, o.npf = int64(npc), npf + return o.numPending() +} + +// Will calculate num pending but only requires a read lock. +// Depends on delivery policy, for last per subject we calculate differently. +// At least RLock should be held. +func (o *consumer) calculateNumPending() (npc, npf uint64) { + if o.mset == nil || o.mset.store == nil { + return 0, 0 + } isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject // Deliver Last Per Subject calculates num pending differently. if isLastPerSubject { - o.npc, o.npf = 0, 0 // Consumer without filters. if o.subjf == nil { - npc, npf := o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject) - o.npc, o.npf = int64(npc), npf - return o.numPending() + return o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject) } // Consumer with filters. for _, filter := range o.subjf { - npc, npf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject) - o.npc += int64(npc) - if npf > o.npf { - o.npf = npf // Always last + lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject) + npc += lnpc + if lnpf > npf { + npf = lnpf // Always last } } - return o.numPending() + return npc, npf } // Every other Delivery Policy is handled here. // Consumer without filters. if o.subjf == nil { - npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject) - o.npc, o.npf = int64(npc), npf - return o.numPending() + return o.mset.store.NumPending(o.sseq, _EMPTY_, false) } // Consumer with filters. - o.npc, o.npf = 0, 0 for _, filter := range o.subjf { // We might loose state of o.subjf, so if we do recover from o.sseq if filter.currentSeq < o.sseq { filter.currentSeq = o.sseq } - npc, npf := o.mset.store.NumPending(filter.currentSeq, filter.subject, isLastPerSubject) - o.npc += int64(npc) - if npf > o.npf { - o.npf = npf // Always last + lnpc, lnpf := o.mset.store.NumPending(filter.currentSeq, filter.subject, false) + npc += lnpc + if lnpf > npf { + npf = lnpf // Always last } } - - return o.numPending() + return npc, npf } func convertToHeadersOnly(pmsg *jsPubMsg) { @@ -4381,6 +4401,10 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, // Cant touch pmsg after this sending so capture what we need. seq, ts := pmsg.seq, pmsg.ts + + // Update delivered first. + o.updateDelivered(dseq, seq, dc, ts) + // Send message. o.outq.send(pmsg) @@ -4401,9 +4425,6 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, o.waiting.last = time.Now() } - // FIXME(dlc) - Capture errors? - o.updateDelivered(dseq, seq, dc, ts) - // If we are ack none and mset is interest only we should make sure stream removes interest. if ap == AckNone && rp != LimitsPolicy { if o.node == nil || o.cfg.Direct { @@ -4646,7 +4667,11 @@ func (o *consumer) checkPending() { check := len(o.pending) > 1024 for seq, p := range o.pending { if check && atomic.LoadInt64(&o.awl) > 0 { - o.ptmr.Reset(100 * time.Millisecond) + if o.ptmr == nil { + o.ptmr = time.AfterFunc(100*time.Millisecond, o.checkPending) + } else { + o.ptmr.Reset(100 * time.Millisecond) + } return } // Check if these are no longer valid. @@ -5391,7 +5416,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) { // If it was pending process it like an ack. if wasPending { - // We could have lock for stream so do this in a go routine. + // We could have the lock for the stream so do this in a go routine. // TODO(dlc) - We should do this with ipq vs naked go routines. go o.processTerm(sseq, p.Sequence, rdc, ackTermUnackedLimitsReason) } @@ -5549,7 +5574,7 @@ func (o *consumer) checkStateForInterestStream() error { return errAckFloorHigherThanLastSeq } - for seq := ss.FirstSeq; seq <= asflr; seq++ { + for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ { mset.ackMsg(o, seq) } diff --git a/server/jetstream.go b/server/jetstream.go index ab6debdacbc..2b877ad8625 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1410,10 +1410,6 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro if !cfg.Created.IsZero() { obs.setCreatedTime(cfg.Created) } - lseq := e.mset.lastSeq() - obs.mu.Lock() - err = obs.readStoredState(lseq) - obs.mu.Unlock() if err != nil { s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) } @@ -2290,8 +2286,8 @@ func (jsa *jsAccount) delete() { jsa.templates = nil jsa.mu.Unlock() - for _, ms := range streams { - ms.stop(false, false) + for _, mset := range streams { + mset.stop(false, false) } for _, t := range ts { diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 0b5d5634469..9aaf90fc418 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1946,7 +1946,17 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s return } } - config := mset.config() + + mset.mu.RLock() + config := mset.cfg + checkAcks := config.Retention != LimitsPolicy && mset.hasLimitsSet() + mset.mu.RUnlock() + + // Check if we are a clustered interest retention stream with limits. + // If so, check ack floors against our state. + if checkAcks { + mset.checkInterestState() + } resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index be8c35b0976..3bb10d14972 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -8471,6 +8471,7 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) { mset.mu.RLock() sysc, js, sa, config := mset.sysc, mset.srv.js.Load(), mset.sa, mset.cfg isLeader := mset.isLeader() + checkAcks := isLeader && config.Retention != LimitsPolicy && mset.isClustered() && mset.hasLimitsSet() mset.mu.RUnlock() // By design all members will receive this. Normally we only want the leader answering. @@ -8484,6 +8485,12 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) { time.Sleep(500 * time.Millisecond) } + // Check if we are a clustered interest retention stream with limits. + // If so, check ack floors against our state. + if checkAcks { + mset.checkInterestState() + } + si := &StreamInfo{ Created: mset.createdTime(), State: mset.state(), diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 92492a58e8d..7e16aa58a74 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -35,6 +35,7 @@ import ( "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" ) func TestJetStreamClusterRemovePeerByID(t *testing.T) { @@ -7037,3 +7038,347 @@ func TestJetStreamClusterConsumerPauseSurvivesRestart(t *testing.T) { require_True(t, leader != nil) checkTimer(leader) } + +func TestJetStreamClusterWorkQueueStreamOrphanIssue(t *testing.T) { + t.Run("R1F", func(t *testing.T) { + testJetStreamClusterWorkQueueStreamOrphanIssue(t, &nats.StreamConfig{ + Name: "OWQTEST_R1F", + Subjects: []string{"MSGS.>"}, + Replicas: 1, + MaxAge: 30 * time.Minute, + Duplicates: 5 * time.Minute, + Retention: nats.WorkQueuePolicy, + Discard: nats.DiscardOld, + AllowRollup: true, + Placement: &nats.Placement{ + Tags: []string{"test"}, + }, + }) + }) + t.Run("R3M", func(t *testing.T) { + testJetStreamClusterWorkQueueStreamOrphanIssue(t, &nats.StreamConfig{ + Name: "OWQTEST_R3M", + Subjects: []string{"MSGS.>"}, + Replicas: 3, + MaxAge: 30 * time.Minute, + MaxMsgs: 100_000, + Duplicates: 5 * time.Minute, + Retention: nats.WorkQueuePolicy, + Discard: nats.DiscardNew, + AllowRollup: true, + Storage: nats.MemoryStorage, + Placement: &nats.Placement{ + Tags: []string{"test"}, + }, + }) + }) + t.Run("R3F_DN", func(t *testing.T) { + testJetStreamClusterWorkQueueStreamOrphanIssue(t, &nats.StreamConfig{ + Name: "OWQTEST_R3F_DN", + Subjects: []string{"MSGS.>"}, + Replicas: 3, + MaxAge: 30 * time.Minute, + MaxMsgs: 100_000, + Duplicates: 5 * time.Minute, + Retention: nats.WorkQueuePolicy, + Discard: nats.DiscardNew, + AllowRollup: true, + Placement: &nats.Placement{ + Tags: []string{"test"}, + }, + }) + }) + t.Run("R3F_DO", func(t *testing.T) { + testJetStreamClusterWorkQueueStreamOrphanIssue(t, &nats.StreamConfig{ + Name: "OWQTEST_R3F_DO", + Subjects: []string{"MSGS.>"}, + Replicas: 3, + MaxAge: 30 * time.Minute, + MaxMsgs: 100_000, + Duplicates: 5 * time.Minute, + Retention: nats.WorkQueuePolicy, + Discard: nats.DiscardOld, + AllowRollup: true, + Placement: &nats.Placement{ + Tags: []string{"test"}, + }, + }) + }) +} + +func testJetStreamClusterWorkQueueStreamOrphanIssue(t *testing.T, sc *nats.StreamConfig) { + conf := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { + store_dir: '%s', + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + server_tags: ["test"] + system_account: sys + no_auth_user: js + accounts { + sys { users = [ { user: sys, pass: sys } ] } + js { + jetstream = enabled + users = [ { user: js, pass: js } ] + } + }` + c := createJetStreamClusterWithTemplate(t, conf, sc.Name, 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + cnc, cjs := jsClientConnect(t, c.randomServer()) + defer cnc.Close() + + _, err := js.AddStream(sc) + require_NoError(t, err) + + pctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Start producers + var wg sync.WaitGroup + + // First call is just to create the pull subscribers. + mp := nats.MaxAckPending(10000) + mw := nats.PullMaxWaiting(1000) + aw := nats.AckWait(5 * time.Second) + + for i := 0; i < 10; i++ { + for _, partition := range []string{"EEEEE"} { + subject := fmt.Sprintf("MSGS.%s.*.H.100XY.*.*.WQ.00000000000%d", partition, i) + consumer := fmt.Sprintf("consumer:%s:%d", partition, i) + _, err := cjs.PullSubscribe(subject, consumer, mp, mw, aw) + require_NoError(t, err) + } + } + + // Create a single consumer that does no activity. + // Make sure we still calculate low ack properly and cleanup etc. + _, err = cjs.PullSubscribe("MSGS.ZZ.>", "consumer:ZZ:0", mp, mw, aw) + require_NoError(t, err) + + subjects := []string{ + "MSGS.EEEEE.P.H.100XY.1.100Z.WQ.000000000000", + "MSGS.EEEEE.P.H.100XY.1.100Z.WQ.000000000001", + "MSGS.EEEEE.P.H.100XY.1.100Z.WQ.000000000002", + "MSGS.EEEEE.P.H.100XY.1.100Z.WQ.000000000003", + "MSGS.EEEEE.P.H.100XY.1.100Z.WQ.000000000004", + "MSGS.EEEEE.P.H.100XY.1.100Z.WQ.000000000005", + "MSGS.EEEEE.P.H.100XY.1.100Z.WQ.000000000006", + "MSGS.EEEEE.P.H.100XY.1.100Z.WQ.000000000007", + "MSGS.EEEEE.P.H.100XY.1.100Z.WQ.000000000008", + "MSGS.EEEEE.P.H.100XY.1.100Z.WQ.000000000009", + } + payload := []byte(strings.Repeat("A", 1024)) + + for i := 0; i < 50; i++ { + wg.Add(1) + go func() { + pnc, pjs := jsClientConnect(t, c.randomServer()) + defer pnc.Close() + + for i := 1; i < 200_000; i++ { + select { + case <-pctx.Done(): + wg.Done() + return + default: + } + for _, subject := range subjects { + // Send each message a few times. + msgID := nats.MsgId(nuid.Next()) + pjs.PublishAsync(subject, payload, msgID) + pjs.Publish(subject, payload, msgID, nats.AckWait(250*time.Millisecond)) + pjs.Publish(subject, payload, msgID, nats.AckWait(250*time.Millisecond)) + } + } + t.Logf("Stopped publishing.") + }() + } + + // Rogue publisher that sends the same msg ID everytime. + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + pnc, pjs := jsClientConnect(t, c.randomServer()) + defer pnc.Close() + + msgID := nats.MsgId("1234567890") + for i := 1; ; i++ { + select { + case <-pctx.Done(): + wg.Done() + return + default: + } + for _, subject := range subjects { + // Send each message a few times. + pjs.PublishAsync(subject, payload, msgID, nats.RetryAttempts(0), nats.RetryWait(0)) + pjs.Publish(subject, payload, msgID, nats.AckWait(1*time.Millisecond), nats.RetryAttempts(0), nats.RetryWait(0)) + pjs.Publish(subject, payload, msgID, nats.AckWait(1*time.Millisecond), nats.RetryAttempts(0), nats.RetryWait(0)) + } + } + }() + } + + // Let enough messages into the stream then start consumers. + time.Sleep(15 * time.Second) + + ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second) + defer cancel() + + for i := 0; i < 10; i++ { + subject := fmt.Sprintf("MSGS.EEEEE.*.H.100XY.*.*.WQ.00000000000%d", i) + consumer := fmt.Sprintf("consumer:EEEEE:%d", i) + for n := 0; n < 5; n++ { + cpnc, cpjs := jsClientConnect(t, c.randomServer()) + defer cpnc.Close() + + psub, err := cpjs.PullSubscribe(subject, consumer, mp) + require_NoError(t, err) + + time.AfterFunc(15*time.Second, func() { + cpnc.Close() + }) + + wg.Add(1) + go func() { + tick := time.NewTicker(1 * time.Millisecond) + for { + if cpnc.IsClosed() { + wg.Done() + return + } + select { + case <-ctx.Done(): + wg.Done() + return + case <-tick.C: + // Fetch 1 first, then if no errors Fetch 100. + msgs, err := psub.Fetch(1, nats.MaxWait(200*time.Millisecond)) + if err != nil { + continue + } + for _, msg := range msgs { + msg.Ack() + } + msgs, err = psub.Fetch(100, nats.MaxWait(200*time.Millisecond)) + if err != nil { + continue + } + for _, msg := range msgs { + msg.Ack() + } + msgs, err = psub.Fetch(1000, nats.MaxWait(200*time.Millisecond)) + if err != nil { + continue + } + for _, msg := range msgs { + msg.Ack() + } + } + } + }() + } + } + + for i := 0; i < 10; i++ { + subject := fmt.Sprintf("MSGS.EEEEE.*.H.100XY.*.*.WQ.00000000000%d", i) + consumer := fmt.Sprintf("consumer:EEEEE:%d", i) + for n := 0; n < 10; n++ { + cpnc, cpjs := jsClientConnect(t, c.randomServer()) + defer cpnc.Close() + + psub, err := cpjs.PullSubscribe(subject, consumer, mp) + if err != nil { + t.Logf("ERROR: %v", err) + continue + } + + wg.Add(1) + go func() { + tick := time.NewTicker(1 * time.Millisecond) + for { + select { + case <-ctx.Done(): + wg.Done() + return + case <-tick.C: + // Fetch 1 first, then if no errors Fetch 100. + msgs, err := psub.Fetch(1, nats.MaxWait(200*time.Millisecond)) + if err != nil { + continue + } + for _, msg := range msgs { + msg.Ack() + } + msgs, err = psub.Fetch(100, nats.MaxWait(200*time.Millisecond)) + if err != nil { + continue + } + for _, msg := range msgs { + msg.Ack() + } + + msgs, err = psub.Fetch(1000, nats.MaxWait(200*time.Millisecond)) + if err != nil { + continue + } + for _, msg := range msgs { + msg.Ack() + } + } + } + }() + } + } + + time.AfterFunc(10*time.Second, func() { + if sc.Replicas == 1 { + // Find server leader of the stream and restart it. + leaderSrv := c.streamLeader("js", sc.Name) + leaderSrv.Shutdown() + leaderSrv.WaitForShutdown() + c.restartServer(leaderSrv) + } else { + // NOTE (wq): For R=3, not sure which server causes the issue here + // so this may be have flaky behavior. + s := c.servers[0] + s.optsMu.Lock() + s.opts.LameDuckDuration = 5 * time.Second + s.opts.LameDuckGracePeriod = -5 * time.Second + s.optsMu.Unlock() + s.lameDuckMode() + s.WaitForShutdown() + c.restartServer(s) + c.waitOnClusterReady() + } + }) + + // Wait until context is done then check state. + <-ctx.Done() + + var consumerPending int + for i := 0; i < 10; i++ { + ci, err := js.ConsumerInfo(sc.Name, fmt.Sprintf("consumer:EEEEE:%d", i)) + require_NoError(t, err) + consumerPending += int(ci.NumPending) + } + + // Check state of streams and consumers. + si, err := js.StreamInfo(sc.Name) + require_NoError(t, err) + + streamPending := int(si.State.Msgs) + if streamPending != consumerPending { + t.Fatalf("Unexpected number of pending messages, stream=%d, consumers=%d", streamPending, consumerPending) + } +} diff --git a/server/stream.go b/server/stream.go index 93d8d0ae283..0462a96787e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -268,6 +268,7 @@ type stream struct { sch chan struct{} sigq *ipQueue[*cMsg] csl *Sublist // Consumer Sublist + // Leader will store seq/msgTrace in clustering mode. Used in applyStreamEntries // to know if trace event should be sent after processing. mt map[uint64]*msgTrace @@ -859,6 +860,12 @@ func (mset *stream) setLeader(isLeader bool) error { mset.leader = _EMPTY_ } mset.mu.Unlock() + + if mset.isInterestRetention() { + // If we are interest based make sure to check consumers. + // This is to make sure we process any outstanding acks. + mset.checkInterestState() + } return nil } @@ -3983,6 +3990,12 @@ func (mset *stream) isClustered() bool { return mset.node != nil } +// Return if any limits are set. +// Lock should be held. +func (mset *stream) hasLimitsSet() bool { + return mset.cfg.MaxMsgs > 0 || mset.cfg.MaxBytes > 0 || mset.cfg.MaxMsgsPer > 0 || mset.cfg.MaxAge > 0 +} + // Used if we have to queue things internally to avoid the route/gw path. type inMsg struct { subj string @@ -5230,9 +5243,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { accName := jsa.account.Name jsa.mu.Unlock() - // Mark as closed, kick monitor and collect consumers first. - mset.closed.Store(true) - + // Kick monitor and collect consumers first. mset.mu.Lock() // Signal to the monitor loop. // Can't use qch here. @@ -5294,6 +5305,9 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.sendDeleteAdvisoryLocked() } + // Mark closed. + mset.closed.Store(true) + // Quit channel, do this after sending the delete advisory if mset.qch != nil { close(mset.qch) @@ -5425,24 +5439,87 @@ func (mset *stream) getPublicConsumers() []*consumer { } // Will check for interest retention and make sure messages -// that have been acked are processed. +// that have been acked are processed and removed. +// This will check the ack floors of all consumers, and adjust our first sequence accordingly. func (mset *stream) checkInterestState() { if mset == nil { return } - mset.mu.RLock() - // If we are limits based nothing to do. - if mset.cfg.Retention == LimitsPolicy { - mset.mu.RUnlock() + if !mset.isInterestRetention() { + // If we are limits based nothing to do. return } - consumers := make([]*consumer, 0, len(mset.consumers)) - for _, o := range mset.consumers { - consumers = append(consumers, o) + + mset.clsMu.RLock() + var zeroAcks []*consumer + var lowAckFloor uint64 = math.MaxUint64 + for _, o := range mset.cList { + o.mu.RLock() + if o.isLeader() { + // We need to account for consumers with ack floor of zero. + // We will collect them and see if we need to check pending below. + if o.asflr == 0 { + zeroAcks = append(zeroAcks, o) + } else if o.asflr < lowAckFloor { + lowAckFloor = o.asflr + } + } else { + // We are a follower so only have the store state, so read that in. + state, err := o.store.State() + if err != nil { + // On error we will not have enough information to process correctly so bail. + o.mu.RUnlock() + mset.clsMu.RUnlock() + return + } + // We need to account for consumers with ack floor of zero. + if state.AckFloor.Stream == 0 { + zeroAcks = append(zeroAcks, o) + } else if state.AckFloor.Stream < lowAckFloor { + lowAckFloor = state.AckFloor.Stream + } + // We are a follower here but if we detect a drift from when we were previous leader correct here. + if o.asflr > state.AckFloor.Stream || o.sseq > state.Delivered.Stream+1 { + o.applyState(state) + } + } + o.mu.RUnlock() } - mset.mu.RUnlock() - for _, o := range consumers { - o.checkStateForInterestStream() + mset.clsMu.RUnlock() + + // If nothing was set we can bail. + if lowAckFloor == math.MaxUint64 { + return + } + + // Hold stream write lock in case we need to purge. + mset.mu.Lock() + defer mset.mu.Unlock() + + // Capture our current state. + var state StreamState + mset.store.FastState(&state) + + if lowAckFloor < math.MaxUint64 && lowAckFloor > state.FirstSeq { + // Check if we had any zeroAcks, we will need to check them. + for _, o := range zeroAcks { + var np uint64 + o.mu.RLock() + if o.isLeader() { + np = uint64(o.numPending()) + } else { + np, _ = o.calculateNumPending() + } + o.mu.RUnlock() + // This means we have pending and can not remove anything at this time. + if np > 0 { + return + } + } + // Purge the stream to lowest ack floor + 1 + mset.store.PurgeEx(_EMPTY_, lowAckFloor+1, 0) + // Also make sure we clear any pending acks. + mset.clearAllPreAcksBelowFloor(lowAckFloor + 1) } } @@ -5805,19 +5882,22 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { return } + store := mset.store var state StreamState - mset.store.FastState(&state) + store.FastState(&state) - // Make sure this sequence is not below our first sequence. - if seq < state.FirstSeq { - mset.clearPreAck(o, seq) + // If this has arrived before we have processed the message itself. + if seq > state.LastSeq { + mset.registerPreAck(o, seq) mset.mu.Unlock() return } - // If this has arrived before we have processed the message itself. - if seq > state.LastSeq { - mset.registerPreAck(o, seq) + // Always clear pre-ack if here. + mset.clearPreAck(o, seq) + + // Make sure this sequence is not below our first sequence. + if seq < state.FirstSeq { mset.mu.Unlock() return } @@ -5839,7 +5919,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { } // If we are here we should attempt to remove. - if _, err := mset.store.RemoveMsg(seq); err == ErrStoreEOF { + if _, err := store.RemoveMsg(seq); err == ErrStoreEOF { // This should not happen, but being pedantic. mset.registerPreAckLock(o, seq) } diff --git a/server/test_test.go b/server/test_test.go index 8f0d393aff9..704cc1e8452 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -303,6 +303,7 @@ func (c *cluster) shutdown() { for i, s := range c.servers { sd := s.StoreDir() s.Shutdown() + s.WaitForShutdown() if cf := c.opts[i].ConfigFile; cf != _EMPTY_ { os.Remove(cf) }