Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9917,6 +9917,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
}
}

start := time.Now()
mset.setCatchupPeer(sreq.Peer, last-seq)

var spb int
Expand All @@ -9925,7 +9926,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
sendNextBatchAndContinue := func(qch chan struct{}) bool {
// Check if we know we will not enter the loop because we are done.
if seq > last {
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
s.Noticef("Catchup for stream '%s > %s' complete (took %v)", mset.account(), mset.name(), time.Since(start))
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
Expand Down Expand Up @@ -9994,7 +9995,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {

// See if we should use LoadNextMsg instead of walking sequence by sequence if we have an order magnitude more interior deletes.
// Only makes sense with delete range capabilities.
useLoadNext := drOk && (uint64(state.NumDeleted) > 10*state.Msgs)
useLoadNext := drOk && (uint64(state.NumDeleted) > 2*state.Msgs || state.NumDeleted > 1_000_000)

var smv StoreMsg
for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbBelowMax(); seq++ {
Expand Down Expand Up @@ -10034,8 +10035,8 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
// The snapshot has a larger last sequence then we have. This could be due to a truncation
// when trying to recover after corruption, still not 100% sure. Could be off by 1 too somehow,
// but tested a ton of those with no success.
s.Warnf("Catchup for stream '%s > %s' completed, but requested sequence %d was larger than current state: %+v",
mset.account(), mset.name(), seq, state)
s.Warnf("Catchup for stream '%s > %s' completed (took %v), but requested sequence %d was larger than current state: %+v",
mset.account(), mset.name(), time.Since(start), seq, state)
// Try our best to redo our invalidated snapshot as well.
if n := mset.raftNode(); n != nil {
if snap := mset.stateSnapshot(); snap != nil {
Expand Down Expand Up @@ -10081,7 +10082,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
if drOk && dr.First > 0 {
sendDR()
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
s.Noticef("Catchup for stream '%s > %s' complete (took %v)", mset.account(), mset.name(), time.Since(start))
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
Expand Down