diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6bde805cbc0..4321637476c 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7679,7 +7679,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq interestPolicy, discard, maxMsgs, maxBytes := mset.cfg.Retention != LimitsPolicy, mset.cfg.Discard, mset.cfg.MaxMsgs, mset.cfg.MaxBytes - isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed + isLeader, isSealed, compressOK := mset.isLeader(), mset.cfg.Sealed, mset.compressOK mset.mu.RUnlock() // This should not happen but possible now that we allow scale up, and scale down where this could trigger. @@ -7882,7 +7882,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ } } - esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), mset.compressOK) + esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), compressOK) var mtKey uint64 if mt != nil { mtKey = mset.clseq diff --git a/server/stream.go b/server/stream.go index 0f7aff269fe..180b5a535ff 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3113,7 +3113,7 @@ func (mset *stream) processAllSourceMsgs() { for _, im := range ims { if !mset.processInboundSourceMsg(im.si, im) { // If we are no longer leader bail. - if !mset.isLeader() { + if !mset.IsLeader() { cleanUp() return } @@ -3124,7 +3124,7 @@ func (mset *stream) processAllSourceMsgs() { msgs.recycle(&ims) case <-t.C: // If we are no longer leader bail. - if !mset.isLeader() { + if !mset.IsLeader() { cleanUp() return } @@ -3188,15 +3188,14 @@ func (mset *stream) handleFlowControl(m *inMsg) { // processInboundSourceMsg handles processing other stream messages bound for this stream. func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { + mset.mu.Lock() // If we are no longer the leader cancel this subscriber. if !mset.isLeader() { - mset.mu.Lock() mset.cancelSourceConsumer(si.iname) mset.mu.Unlock() return false } - mset.mu.Lock() isControl := m.isControlMsg() // Ignore from old subscriptions.