diff --git a/server/consumer.go b/server/consumer.go index 28951dec53e..2597d5d85b1 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2476,9 +2476,7 @@ func (o *consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) { n += binary.PutUvarint(b[n:], dc) n += binary.PutVarint(b[n:], ts) o.propose(b[:n]) - } - if o.store != nil { - // Update local state always. + } else if o.store != nil { o.store.UpdateDelivered(dseq, sseq, dc, ts) } // Update activity. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 85ac8e4c75f..1d2c85bde02 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5141,25 +5141,22 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea buf := e.Data switch entryOp(buf[0]) { case updateDeliveredOp: - // These are handled in place in leaders. - if !isLeader { - dseq, sseq, dc, ts, err := decodeDeliveredUpdate(buf[1:]) - if err != nil { - if mset, node := o.streamAndNode(); mset != nil && node != nil { - s := js.srv - s.Errorf("JetStream cluster could not decode consumer delivered update for '%s > %s > %s' [%s]", - mset.account(), mset.name(), o, node.Group()) - } - panic(err.Error()) - } - // Make sure to update delivered under the lock. - o.mu.Lock() - err = o.store.UpdateDelivered(dseq, sseq, dc, ts) - o.ldt = time.Now() - o.mu.Unlock() - if err != nil { - panic(err.Error()) + dseq, sseq, dc, ts, err := decodeDeliveredUpdate(buf[1:]) + if err != nil { + if mset, node := o.streamAndNode(); mset != nil && node != nil { + s := js.srv + s.Errorf("JetStream cluster could not decode consumer delivered update for '%s > %s > %s' [%s]", + mset.account(), mset.name(), o, node.Group()) } + panic(err.Error()) + } + // Make sure to update delivered under the lock. + o.mu.Lock() + err = o.store.UpdateDelivered(dseq, sseq, dc, ts) + o.ldt = time.Now() + o.mu.Unlock() + if err != nil { + panic(err.Error()) } case updateAcksOp: dseq, sseq, err := decodeAckUpdate(buf[1:])