diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 05d0ad86505a..21e35f510481 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -163,6 +163,15 @@ func (a AllocatorAction) Remove() bool { a == AllocatorRemoveDecommissioningNonVoter } +// Decommissioning indicates an action replacing or removing a decommissioning +// replicas. +func (a AllocatorAction) Decommissioning() bool { + return a == AllocatorRemoveDecommissioningVoter || + a == AllocatorRemoveDecommissioningNonVoter || + a == AllocatorReplaceDecommissioningVoter || + a == AllocatorReplaceDecommissioningNonVoter +} + // TargetReplicaType returns that the action is for a voter or non-voter replica. func (a AllocatorAction) TargetReplicaType() TargetReplicaType { var t TargetReplicaType diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index 8270e7c02577..0b845d6b7ad7 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -766,10 +766,16 @@ func (bq *baseQueue) addInternal( item = &replicaItem{rangeID: desc.RangeID, replicaID: replicaID, priority: priority} bq.addLocked(item) - // If adding this replica has pushed the queue past its maximum size, - // remove the lowest priority element. + // If adding this replica has pushed the queue past its maximum size, remove + // an element. Note that it might not be the lowest priority since heap is not + // guaranteed to be globally ordered. Ideally, we would remove the lowest + // priority element, but it would require additional bookkeeping or a linear + // scan. if pqLen := bq.mu.priorityQ.Len(); pqLen > bq.maxSize { - bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1]) + replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1] + log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v", + priority, replicaItemToDrop.replicaID) + bq.removeLocked(replicaItemToDrop) } // Signal the processLoop that a replica has been added. select { diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 7019f62e0208..adcdc2685519 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -2930,15 +2930,8 @@ func (r *Replica) maybeEnqueueProblemRange( if !isLeaseholder || !leaseValid { // The replicate queue will not process the replica without a valid lease. // Track when we skip enqueuing for these reasons. - boolToInt := func(b bool) int { - if b { - return 1 - } - return 0 - } - reasons := []string{"is not the leaseholder", "the lease is not valid"} - reason := reasons[boolToInt(isLeaseholder)] - log.KvDistribution.VInfof(ctx, 1, "not enqueuing replica %s because %s", r.Desc(), reason) + log.KvDistribution.Infof(ctx, "not enqueuing replica %s because isLeaseholder=%t, leaseValid=%t", + r.Desc(), isLeaseholder, leaseValid) r.store.metrics.DecommissioningNudgerNotLeaseholderOrInvalidLease.Inc(1) return } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 9bfac52f27f7..5e63ba7c3dcc 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -923,19 +923,12 @@ func (rq *replicateQueue) processOneChange( } func maybeAnnotateDecommissionErr(err error, action allocatorimpl.AllocatorAction) error { - if err != nil && isDecommissionAction(action) { + if err != nil && action.Decommissioning() { err = decommissionPurgatoryError{err} } return err } -func isDecommissionAction(action allocatorimpl.AllocatorAction) bool { - return action == allocatorimpl.AllocatorRemoveDecommissioningVoter || - action == allocatorimpl.AllocatorRemoveDecommissioningNonVoter || - action == allocatorimpl.AllocatorReplaceDecommissioningVoter || - action == allocatorimpl.AllocatorReplaceDecommissioningNonVoter -} - // shedLease takes in a leaseholder replica, looks for a target for transferring // the lease and, if a suitable target is found (e.g. alive, not draining), // transfers the lease away.