diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 800d190703d..c31dd02d593 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -9917,6 +9917,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { } } + start := time.Now() mset.setCatchupPeer(sreq.Peer, last-seq) var spb int @@ -9925,7 +9926,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { sendNextBatchAndContinue := func(qch chan struct{}) bool { // Check if we know we will not enter the loop because we are done. if seq > last { - s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) + s.Noticef("Catchup for stream '%s > %s' complete (took %v)", mset.account(), mset.name(), time.Since(start)) // EOF s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) return false @@ -9994,7 +9995,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // See if we should use LoadNextMsg instead of walking sequence by sequence if we have an order magnitude more interior deletes. // Only makes sense with delete range capabilities. - useLoadNext := drOk && (uint64(state.NumDeleted) > 10*state.Msgs) + useLoadNext := drOk && (uint64(state.NumDeleted) > 2*state.Msgs || state.NumDeleted > 1_000_000) var smv StoreMsg for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbBelowMax(); seq++ { @@ -10034,8 +10035,8 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // The snapshot has a larger last sequence then we have. This could be due to a truncation // when trying to recover after corruption, still not 100% sure. Could be off by 1 too somehow, // but tested a ton of those with no success. - s.Warnf("Catchup for stream '%s > %s' completed, but requested sequence %d was larger than current state: %+v", - mset.account(), mset.name(), seq, state) + s.Warnf("Catchup for stream '%s > %s' completed (took %v), but requested sequence %d was larger than current state: %+v", + mset.account(), mset.name(), time.Since(start), seq, state) // Try our best to redo our invalidated snapshot as well. if n := mset.raftNode(); n != nil { if snap := mset.stateSnapshot(); snap != nil { @@ -10081,7 +10082,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { if drOk && dr.First > 0 { sendDR() } - s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) + s.Noticef("Catchup for stream '%s > %s' complete (took %v)", mset.account(), mset.name(), time.Since(start)) // EOF s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) return false