Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8885,7 +8885,6 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
)
diff := &batchStagedDiff{}
if hdr, msg, dseq, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, msg, sourced, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil {
// TODO(mvv): reset in-memory expected header maps
mset.clMu.Unlock()
if err == errMsgIdDuplicate && dseq > 0 {
var buf [256]byte
Expand Down Expand Up @@ -8916,12 +8915,10 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}

// Do proposal.
err = node.Propose(esm)
// TODO(mvv): reset in-memory expected header maps, if err!=nil
if err == nil {
mset.clseq++
mset.trackReplicationTraffic(node, len(esm), r)
}
_ = node.Propose(esm)
// The proposal can fail, but we always account for trying.
mset.clseq++
mset.trackReplicationTraffic(node, len(esm), r)

// Check to see if we are being overrun.
// TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured.
Expand Down
12 changes: 5 additions & 7 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6461,7 +6461,8 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr

rollback := func(seq uint64) {
if isClustered {
// TODO(mvv): reset in-memory expected header maps
// Only need to move the clustered sequence back if the batch fails to commit.
// Other changes were staged but not applied, so this is the only thing we need to do.
mset.clseq -= seq - 1
}
mset.clMu.Unlock()
Expand Down Expand Up @@ -6567,12 +6568,9 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr
// Do a single multi proposal. This ensures we get to push all entries to the proposal queue in-order
// and not interleaved with other proposals.
diff.commit(mset)
if err := node.ProposeMulti(entries); err == nil {
mset.trackReplicationTraffic(node, sz, r)
} else {
// TODO(mvv): reset in-memory expected header maps
mset.clseq -= batchSeq
}
_ = node.ProposeMulti(entries)
// The proposal can fail, but we always account for trying.
mset.trackReplicationTraffic(node, sz, r)

// Check to see if we are being overrun.
// TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured.
Expand Down