From 9790d749d7665ed85871038601f15bb2a4cd92c0 Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Mon, 25 Aug 2025 19:55:23 -0400 Subject: [PATCH 1/4] allocator: move isDecommissionAction to allocatorimpl This commit refactors isDecommissionAction into allocatorimpl for consistency with other similar helpers like allocatorActions.{Add,Replace,Remove}. This change has no behavior changes but to make future commits easier. --- pkg/kv/kvserver/allocator/allocatorimpl/allocator.go | 9 +++++++++ pkg/kv/kvserver/replicate_queue.go | 9 +-------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 05d0ad86505a..21e35f510481 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -163,6 +163,15 @@ func (a AllocatorAction) Remove() bool { a == AllocatorRemoveDecommissioningNonVoter } +// Decommissioning indicates an action replacing or removing a decommissioning +// replicas. +func (a AllocatorAction) Decommissioning() bool { + return a == AllocatorRemoveDecommissioningVoter || + a == AllocatorRemoveDecommissioningNonVoter || + a == AllocatorReplaceDecommissioningVoter || + a == AllocatorReplaceDecommissioningNonVoter +} + // TargetReplicaType returns that the action is for a voter or non-voter replica. func (a AllocatorAction) TargetReplicaType() TargetReplicaType { var t TargetReplicaType diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 9bfac52f27f7..5e63ba7c3dcc 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -923,19 +923,12 @@ func (rq *replicateQueue) processOneChange( } func maybeAnnotateDecommissionErr(err error, action allocatorimpl.AllocatorAction) error { - if err != nil && isDecommissionAction(action) { + if err != nil && action.Decommissioning() { err = decommissionPurgatoryError{err} } return err } -func isDecommissionAction(action allocatorimpl.AllocatorAction) bool { - return action == allocatorimpl.AllocatorRemoveDecommissioningVoter || - action == allocatorimpl.AllocatorRemoveDecommissioningNonVoter || - action == allocatorimpl.AllocatorReplaceDecommissioningVoter || - action == allocatorimpl.AllocatorReplaceDecommissioningNonVoter -} - // shedLease takes in a leaseholder replica, looks for a target for transferring // the lease and, if a suitable target is found (e.g. alive, not draining), // transfers the lease away. From cedad113f9ce22d67a62a057f42f5bdee35c44fa Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Mon, 25 Aug 2025 20:59:34 -0400 Subject: [PATCH 2/4] kvserver: simplify logging in maybeEnqueueProblemRange This commit simplifies the logging in `maybeEnqueueProblemRange` to log two booleans directly. --- pkg/kv/kvserver/replica.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 7019f62e0208..adcdc2685519 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -2930,15 +2930,8 @@ func (r *Replica) maybeEnqueueProblemRange( if !isLeaseholder || !leaseValid { // The replicate queue will not process the replica without a valid lease. // Track when we skip enqueuing for these reasons. - boolToInt := func(b bool) int { - if b { - return 1 - } - return 0 - } - reasons := []string{"is not the leaseholder", "the lease is not valid"} - reason := reasons[boolToInt(isLeaseholder)] - log.KvDistribution.VInfof(ctx, 1, "not enqueuing replica %s because %s", r.Desc(), reason) + log.KvDistribution.Infof(ctx, "not enqueuing replica %s because isLeaseholder=%t, leaseValid=%t", + r.Desc(), isLeaseholder, leaseValid) r.store.metrics.DecommissioningNudgerNotLeaseholderOrInvalidLease.Inc(1) return } From 8c80097877aa16c6d7d0cf91512c50fa928d90c5 Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Mon, 25 Aug 2025 20:19:55 -0400 Subject: [PATCH 3/4] kvserver: fix comment when dropping due to exceeding size Previously, the comment on the queue incorrectly stated that it removes the lowest-priority element when exceeding its maximum size. This was misleading because heap only guarantees that the root is the highest priority, not that elements are globally ordered. This commit updates the comment to clarify that the removed element might not be the lowest priority. Ideally, we should drop the lowest priority element when exceeding queue size, but heap doesn't make this very easy. --- pkg/kv/kvserver/queue.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index 8270e7c02577..c43e0caca3f5 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -766,8 +766,11 @@ func (bq *baseQueue) addInternal( item = &replicaItem{rangeID: desc.RangeID, replicaID: replicaID, priority: priority} bq.addLocked(item) - // If adding this replica has pushed the queue past its maximum size, - // remove the lowest priority element. + // If adding this replica has pushed the queue past its maximum size, remove + // an element. Note that it might not be the lowest priority since heap is not + // guaranteed to be globally ordered. Ideally, we would remove the lowest + // priority element, but it would require additional bookkeeping or a linear + // scan. if pqLen := bq.mu.priorityQ.Len(); pqLen > bq.maxSize { bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1]) } From ed6c55ee5ddc9e8b422522a462b106c4d6848651 Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Tue, 26 Aug 2025 08:32:51 -0400 Subject: [PATCH 4/4] kvserver: add logging for ranges dropped from base queue This commit adds logging for ranges dropped from the base queue due to exceeding max size, improving observability. The log is gated behind V(1) to avoid verbosity on nodes with many ranges. --- pkg/kv/kvserver/queue.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index c43e0caca3f5..0b845d6b7ad7 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -772,7 +772,10 @@ func (bq *baseQueue) addInternal( // priority element, but it would require additional bookkeeping or a linear // scan. if pqLen := bq.mu.priorityQ.Len(); pqLen > bq.maxSize { - bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1]) + replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1] + log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v", + priority, replicaItemToDrop.replicaID) + bq.removeLocked(replicaItemToDrop) } // Signal the processLoop that a replica has been added. select {