Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8c4f0bd
kvserver: add logging & metrics for decommissioning nudger
dodeca12 Aug 19, 2025
fbd4d45
allocator: move isDecommissionAction to allocatorimpl
wenyihu6 Aug 25, 2025
f1fcfb6
kvserver: simplify logging in maybeEnqueueProblemRange
wenyihu6 Aug 26, 2025
abb0d68
kvserver: fix comment when dropping due to exceeding size
wenyihu6 Aug 26, 2025
07a1694
kvserver: add logging for ranges dropped from base queue
wenyihu6 Aug 26, 2025
e8a3906
kvserver: plumb enqueue time priority
wenyihu6 Aug 22, 2025
ead1c7e
kvserver: remove priority reset during setProcessing
wenyihu6 Aug 22, 2025
ab8b761
kvserver: plumb priority at enqueue for purgatory queue
wenyihu6 Aug 22, 2025
42066dc
allocatorimpl: adds a priority assertion to computeAction
wenyihu6 Aug 26, 2025
602982c
allocatorimpl: add invariants on priority to base queue tests
wenyihu6 Aug 26, 2025
5bccff0
allocator: correct logging for priority assertion
wenyihu6 Aug 28, 2025
c0a4ec7
kvserver: remove bq.replicaCanBeProcessed right before bq.processReplica
wenyihu6 Aug 26, 2025
755a74b
kvserver: add ReplicateQueueDroppedDueToSize
wenyihu6 Aug 29, 2025
01c4c41
kvserver: add ReplicateQueueMaxSize
wenyihu6 Aug 29, 2025
7afd957
kvserver: add TestReplicateQueueMaxSize
wenyihu6 Aug 29, 2025
6c02ea6
kvserver: drop excess replicas when lowering ReplicateQueueMaxSize
wenyihu6 Aug 29, 2025
ca42283
kvserver: rename ReplicateQueueDroppedDueToSize to ReplicateQueueFull
wenyihu6 Aug 29, 2025
0a2a5a9
kvserver: add PriorityInversionRequeue
wenyihu6 Aug 27, 2025
8887a72
kvserver: requeue on priority inversion for replicate queue
wenyihu6 Aug 27, 2025
761aebc
kvserver: use priorityInversionLogEveryN
wenyihu6 Aug 28, 2025
35137ac
kvserver: improve comments for PriorityInversionRequeue
wenyihu6 Aug 29, 2025
72edb88
allocator: small refactor for CheckPriorityInversion
wenyihu6 Aug 29, 2025
215629e
allocator: add TestAllocatorPriorityInvariance
wenyihu6 Aug 29, 2025
89e7329
kvserver: guard inversion check and requeue behind PriorityInversionR…
wenyihu6 Aug 29, 2025
af1b99c
kvserver: move priority inversion check before applyChange
wenyihu6 Sep 1, 2025
01b6241
kvserver: check for requeue before error checking in rq.process
wenyihu6 Sep 2, 2025
b991142
kvserver: use non-blocking send on errors for maybeBackpressureBatch
wenyihu6 Aug 31, 2025
d823cdc
kvserver: return baseQueueAsyncRateLimited from bq.Async
wenyihu6 Aug 29, 2025
1d2e4c8
kvserver: add onProcessResult and onEnqueueResult to processCallback
wenyihu6 Aug 28, 2025
cb9ee71
kvserver: add TestBaseQueueCallback
wenyihu6 Aug 30, 2025
ef48cb0
kvserver: better comments for on processCallback
wenyihu6 Sep 2, 2025
d6b63f1
kvserver: treat priority update as a success with onEnqueueResult
wenyihu6 Sep 2, 2025
c7c06ea
kvserver: rename processCallback processCallback to cb processCallback
wenyihu6 Sep 2, 2025
ff59398
kvserver: call cb.onEnqueueResult in defer on errors
wenyihu6 Sep 2, 2025
52b461b
fixup! kvserver: treat priority update as a success with onEnqueueResult
wenyihu6 Sep 2, 2025
cd44a09
kvserver: allow logs from callbacks up to 15 replicas per updateRepli…
wenyihu6 Sep 2, 2025
b04d205
kvserver: rename shouldLog to maybeLog and change vlevel to a var
wenyihu6 Sep 3, 2025
b669c80
kvserver: improve observability with decommission nudger
wenyihu6 Aug 31, 2025
b61dc8a
kvserver: add enqueue metrics to base queue
wenyihu6 Sep 3, 2025
a15adf1
kvserver: move bq.enqueueAdd update to be outside of defer
wenyihu6 Sep 3, 2025
8b07f87
kvserver: test metrics in TestBaseQueueCallback* and TestReplicateQue…
wenyihu6 Sep 3, 2025
608060b
kvserver: track priority inversion in replicate queue metrics
wenyihu6 Aug 27, 2025
81d63db
kvserver: add TestPriorityInversionRequeue
wenyihu6 Aug 27, 2025
7ee07bc
kvserver: delete per action priority inversion metrics
wenyihu6 Sep 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions docs/generated/metrics/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13847,6 +13847,38 @@ layers:
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.enqueue.add
exported_name: queue_replicate_enqueue_add
description: Number of replicas successfully added to the replicate queue
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.enqueue.failedprecondition
exported_name: queue_replicate_enqueue_failedprecondition
description: Number of replicas that failed the precondition checks and were therefore not added to the replicate queue
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.enqueue.noaction
exported_name: queue_replicate_enqueue_noaction
description: Number of replicas for which ShouldQueue determined no action was needed and were therefore not added to the replicate queue
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.enqueue.unexpectederror
exported_name: queue_replicate_enqueue_unexpectederror
description: Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to add to the replicate queue directly), but failed to be enqueued due to unexpected errors
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.nonvoterpromotions
exported_name: queue_replicate_nonvoterpromotions
description: Number of non-voters promoted to voters by the replicate queue
Expand All @@ -13863,6 +13895,22 @@ layers:
unit: COUNT
aggregation: AVG
derivative: NONE
- name: queue.replicate.priority_inversion.requeue
exported_name: queue_replicate_priority_inversion_requeue
description: Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time. When the priority has changed from a high priority repair action to rebalance, the change is requeued to avoid unfairness.
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.total
exported_name: queue_replicate_priority_inversion_total
description: Total number of priority inversions in the replicate queue. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.process.failure
exported_name: queue_replicate_process_failure
description: Number of replicas which failed processing in the replicate queue
Expand Down Expand Up @@ -13895,6 +13943,14 @@ layers:
unit: COUNT
aggregation: AVG
derivative: NONE
- name: queue.replicate.queue_full
exported_name: queue_replicate_queue_full
description: Number of times a replica was dropped from the queue due to queue fullness
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.rebalancenonvoterreplica
exported_name: queue_replicate_rebalancenonvoterreplica
description: Number of non-voter replica rebalancer-initiated additions attempted by the replicate queue
Expand Down Expand Up @@ -15269,6 +15325,60 @@ layers:
unit: COUNT
aggregation: AVG
derivative: NONE
- name: ranges.decommissioning.nudger.enqueue
exported_name: ranges_decommissioning_nudger_enqueue
labeled_name: 'ranges.decommissioning.nudger.enqueue{status: enqueue}'
description: 'Number of enqueued enqueues of a range for decommissioning by the decommissioning nudger. Note: This metric tracks when the nudger attempts to enqueue, but the replica might not end up being enqueued by the priority queue due to various filtering or failure conditions.'
y_axis_label: Ranges
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: ranges.decommissioning.nudger.enqueue.failure
exported_name: ranges_decommissioning_nudger_enqueue_failure
labeled_name: ranges.decommissioning.nudger.enqueue.failure
description: Number of ranges that failed to enqueue at the replicate queue
y_axis_label: Ranges
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: ranges.decommissioning.nudger.enqueue.success
exported_name: ranges_decommissioning_nudger_enqueue_success
labeled_name: ranges.decommissioning.nudger.enqueue.success
description: Number of ranges that were successfully enqueued by the decommisioning nudger
y_axis_label: Ranges
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease
exported_name: ranges_decommissioning_nudger_not_leaseholder_or_invalid_lease
labeled_name: ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease
description: Number of ranges that were not the leaseholder or had an invalid lease at the decommissioning nudger
y_axis_label: Ranges
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: ranges.decommissioning.nudger.process.failure
exported_name: ranges_decommissioning_nudger_process_failure
labeled_name: ranges.decommissioning.nudger.process.failure
description: Number of ranges enqueued by the decommissioning nudger that failed to process by the replicate queue
y_axis_label: Ranges
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: ranges.decommissioning.nudger.process.success
exported_name: ranges_decommissioning_nudger_process_success
labeled_name: ranges.decommissioning.nudger.process.success
description: Number of ranges enqueued by the decommissioning nudger that were successfully processed by the replicate queue
y_axis_label: Ranges
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: ranges.overreplicated
exported_name: ranges_overreplicated
description: Number of ranges with more live replicas than the replication target
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/stop",
Expand Down
152 changes: 150 additions & 2 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -138,6 +139,7 @@ const (
AllocatorConsiderRebalance
AllocatorRangeUnavailable
AllocatorFinalizeAtomicReplicationChange
AllocatorMaxPriority
)

// Add indicates an action adding a replica.
Expand All @@ -163,6 +165,15 @@ func (a AllocatorAction) Remove() bool {
a == AllocatorRemoveDecommissioningNonVoter
}

// Decommissioning indicates an action replacing or removing a decommissioning
// replicas.
func (a AllocatorAction) Decommissioning() bool {
return a == AllocatorRemoveDecommissioningVoter ||
a == AllocatorRemoveDecommissioningNonVoter ||
a == AllocatorReplaceDecommissioningVoter ||
a == AllocatorReplaceDecommissioningNonVoter
}

// TargetReplicaType returns that the action is for a voter or non-voter replica.
func (a AllocatorAction) TargetReplicaType() TargetReplicaType {
var t TargetReplicaType
Expand Down Expand Up @@ -242,10 +253,27 @@ func (a AllocatorAction) SafeValue() {}
// range. Within a given range, the ordering of the various checks inside
// `Allocator.computeAction` determines which repair/rebalancing actions are
// taken before the others.
//
// NB: Priorities should be non-negative and should be spaced in multiples of
// 100 unless you believe they should belong to the same priority category.
// AllocatorNoop should have the lowest priority. CheckPriorityInversion depends
// on this contract. In most cases, the allocator returns a priority that
// matches the definitions below. For AllocatorAddVoter,
// AllocatorRemoveDeadVoter, and AllocatorRemoveVoter, the priority may be
// adjusted (see ComputeAction for details), but the adjustment is expected to
// be small (<49).
//
// Exceptions: AllocatorFinalizeAtomicReplicationChange, AllocatorRemoveLearner,
// and AllocatorReplaceDeadVoter violates the spacing of 100. These cases
// predate this comment, so we allow them as they belong to the same general
// priority category.
func (a AllocatorAction) Priority() float64 {
const maxPriority = 12002
switch a {
case AllocatorMaxPriority:
return maxPriority
case AllocatorFinalizeAtomicReplicationChange:
return 12002
return maxPriority
case AllocatorRemoveLearner:
return 12001
case AllocatorReplaceDeadVoter:
Expand Down Expand Up @@ -946,10 +974,68 @@ func (a *Allocator) ComputeAction(
return action, action.Priority()
}

return a.computeAction(ctx, storePool, conf, desc.Replicas().VoterDescriptors(),
action, priority = a.computeAction(ctx, storePool, conf, desc.Replicas().VoterDescriptors(),
desc.Replicas().NonVoterDescriptors())
// Ensure that priority is never -1. Typically, computeAction return
// action.Priority(), but we sometimes modify the priority for specific
// actions like AllocatorAddVoter, AllocatorRemoveDeadVoter, and
// AllocatorRemoveVoter. A priority of -1 is a special case, indicating that
// the caller expects the processing logic to be invoked even if there's a
// priority inversion. If the priority is not -1, the range might be re-queued
// to be processed with the correct priority.
if priority == -1 {
if buildutil.CrdbTestBuild {
log.Fatalf(ctx, "allocator returned -1 priority for range %s: %v", desc, action)
} else {
log.Warningf(ctx, "allocator returned -1 priority for range %s: %v", desc, action)
}
}
return action, priority
}

// computeAction determines the action to take on a range along with its
// priority.
//
// NB: The returned priority may include a small adjustment and therefore might
// not exactly match action.Priority(). See AllocatorAddVoter,
// AllocatorRemoveDeadVoter, AllocatorRemoveVoter below. The adjustment should
// be <49 with two assumptions below. New uses on this contract should be
// avoided since the assumptions are not strong guarantees (especially the
// second one).
//
// The claim that the adjustment is < 49 has two assumptions:
// 1. min(num_replicas,total_nodes) in zone configuration is < 98.
// 2. when ranges are not under-replicated, the difference between
// min(num_replicas,total_nodes)/2-1 and existing_replicas is < 49.
//
// neededVoters <= min(num_replicas,total_nodes)
// desiredQuorum = neededVoters/2-1
// quorum = haveVoters/2-1
//
// For AllocatorAddVoter, we know haveVoters < neededVoters
// adjustment = desiredQuorum-haveVoters = neededVoters/2-1-haveVoters
// To find the worst case (largest adjustment),
// 1. haveVoters = neededVoters-1,
// adjustment = neededVoters/2-1-(neededVoters-1)
// = neededVoters/2-neededVoters = -neededVoters/2
// 2. haveVoters = 0
// adjustement = neededVoters/2-1
//
// In order for adjustment to be <49, neededVoters/2<49 => neededVoters<98.
// Hence the first assumption.
//
// For AllocatorRemoveDeadVoter, we know haveVoters >= neededVoters
// adjustment = desiredQuorum-haveVoters = neededVoters/2-1-haveVoters
// To find the worst case (largest adjustment),
// 1. neededVoters/2-1 is much larger than haveVoters: given haveVoters >=
// neededVoters, haveVoters/2-1 >= neededVoters/2-1. So this case is impossible.
// 2. neededVoters/2-1 is much smaller than haveVoters: since ranges could be
// over-replicated, theoretically speaking, there may be no upper bounds on
// haveVoters. In order for adjustment to be < 49, we can only make an
// assumption here that the difference between neededVoters/2-1 and haveVoters
// cannot be >= 49 in this case.
//
// For AllocatorRemoveVoter, adjustment is haveVoters%2 = 0 or 1 < 49.
func (a *Allocator) computeAction(
ctx context.Context,
storePool storepool.AllocatorStorePool,
Expand Down Expand Up @@ -3220,3 +3306,65 @@ func replDescsToStoreIDs(descs []roachpb.ReplicaDescriptor) []roachpb.StoreID {
}
return ret
}

// roundToNearestPriorityCategory rounds a priority to the nearest 100. n should
// be non-negative.
func roundToNearestPriorityCategory(n float64) float64 {
return math.Round(n/100.0) * 100
}

// CheckPriorityInversion returns whether there was a priority inversion (and
// the range should not be processed at this time, since doing so could starve
// higher-priority items), and whether the caller should re-add the range to the
// queue (presumably under its new priority). A priority inversion happens if
// the priority at enqueue time is higher than the priority corresponding to the
// action computed at processing time. Caller should re-add the range to the
// queue if it has gone from a repair action to lowest priority
// (AllocatorConsiderRebalance).
//
// Note: Changing from AllocatorRangeUnavailable/AllocatorNoop to
// AllocatorConsiderRebalance is not treated as a priority inversion. Going from
// a repair action to AllocatorRangeUnavailable/AllocatorNoop is considered a
// priority inversion but shouldRequeue = false.
//
// INVARIANT: shouldRequeue => isInversion
func CheckPriorityInversion(
priorityAtEnqueue float64, actionAtProcessing AllocatorAction,
) (isInversion bool, shouldRequeue bool) {
// NB: priorityAtEnqueue is -1 for callers such as scatter, dry runs, and
// manual queue runs. Priority inversion does not apply to these calls.
if priorityAtEnqueue == -1 {
return false, false
}

// NB: we need to check for when priorityAtEnqueue falls within the range
// of the allocator actions because store.Enqueue might enqueue things with
// a very high priority (1e5). In those cases, we do not want to requeue
// these actions or count it as an inversion.
withinPriorityRange := func(priority float64) bool {
return AllocatorNoop.Priority() <= priority && priority <= AllocatorMaxPriority.Priority()
}
if !withinPriorityRange(priorityAtEnqueue) {
return false, false
}

if priorityAtEnqueue > AllocatorConsiderRebalance.Priority() && actionAtProcessing == AllocatorConsiderRebalance {
return true, true
}

// NB: Usually, the priority at enqueue time should correspond to
// action.Priority(). However, for AllocatorAddVoter,
// AllocatorRemoveDeadVoter, AllocatorRemoveVoter, the priority can be
// adjusted at enqueue time (See ComputeAction for more details). However, we
// expect the adjustment to be relatively small (<49). So we round the
// priority to the nearest 100 to compare against
// actionAtProcessing.Priority(). Without this rounding, we might treat going
// from 10000 to 999 as an inversion, but it was just due to the adjustment.
// Note that priorities at AllocatorFinalizeAtomicReplicationChange,
// AllocatorRemoveLearner, and AllocatorReplaceDeadVoter will be rounded to
// the same priority. They are so close to each other, so we don't really
// count it as an inversion among them.
normPriorityAtEnqueue := roundToNearestPriorityCategory(priorityAtEnqueue)
isInversion = normPriorityAtEnqueue > actionAtProcessing.Priority()
return isInversion, false
}
Loading