diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f6e148f566d..c0d0c99a988 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -8885,7 +8885,6 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ ) diff := &batchStagedDiff{} if hdr, msg, dseq, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, msg, sourced, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil { - // TODO(mvv): reset in-memory expected header maps mset.clMu.Unlock() if err == errMsgIdDuplicate && dseq > 0 { var buf [256]byte @@ -8916,12 +8915,10 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ } // Do proposal. - err = node.Propose(esm) - // TODO(mvv): reset in-memory expected header maps, if err!=nil - if err == nil { - mset.clseq++ - mset.trackReplicationTraffic(node, len(esm), r) - } + _ = node.Propose(esm) + // The proposal can fail, but we always account for trying. + mset.clseq++ + mset.trackReplicationTraffic(node, len(esm), r) // Check to see if we are being overrun. // TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured. diff --git a/server/stream.go b/server/stream.go index 1bf64daba42..1e6e936f44e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -6461,7 +6461,8 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr rollback := func(seq uint64) { if isClustered { - // TODO(mvv): reset in-memory expected header maps + // Only need to move the clustered sequence back if the batch fails to commit. + // Other changes were staged but not applied, so this is the only thing we need to do. mset.clseq -= seq - 1 } mset.clMu.Unlock() @@ -6567,12 +6568,9 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr // Do a single multi proposal. This ensures we get to push all entries to the proposal queue in-order // and not interleaved with other proposals. diff.commit(mset) - if err := node.ProposeMulti(entries); err == nil { - mset.trackReplicationTraffic(node, sz, r) - } else { - // TODO(mvv): reset in-memory expected header maps - mset.clseq -= batchSeq - } + _ = node.ProposeMulti(entries) + // The proposal can fail, but we always account for trying. + mset.trackReplicationTraffic(node, sz, r) // Check to see if we are being overrun. // TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured.