Skip to content
125 changes: 124 additions & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ const (
AllocatorConsiderRebalance
AllocatorRangeUnavailable
AllocatorFinalizeAtomicReplicationChange
AllocatorMaxPriority
)

// Add indicates an action adding a replica.
Expand Down Expand Up @@ -252,10 +253,27 @@ func (a AllocatorAction) SafeValue() {}
// range. Within a given range, the ordering of the various checks inside
// `Allocator.computeAction` determines which repair/rebalancing actions are
// taken before the others.
//
// NB: Priorities should be non-negative and should be spaced in multiples of
// 100 unless you believe they should belong to the same priority category.
// AllocatorNoop should have the lowest priority. CheckPriorityInversion depends
// on this contract. In most cases, the allocator returns a priority that
// matches the definitions below. For AllocatorAddVoter,
// AllocatorRemoveDeadVoter, and AllocatorRemoveVoter, the priority may be
// adjusted (see ComputeAction for details), but the adjustment is expected to
// be small (<49).
//
// Exceptions: AllocatorFinalizeAtomicReplicationChange, AllocatorRemoveLearner,
// and AllocatorReplaceDeadVoter violates the spacing of 100. These cases
// predate this comment, so we allow them as they belong to the same general
// priority category.
func (a AllocatorAction) Priority() float64 {
const maxPriority = 12002
switch a {
case AllocatorMaxPriority:
return maxPriority
case AllocatorFinalizeAtomicReplicationChange:
return 12002
return maxPriority
case AllocatorRemoveLearner:
return 12001
case AllocatorReplaceDeadVoter:
Expand Down Expand Up @@ -975,6 +993,49 @@ func (a *Allocator) ComputeAction(
return action, priority
}

// computeAction determines the action to take on a range along with its
// priority.
//
// NB: The returned priority may include a small adjustment and therefore might
// not exactly match action.Priority(). See AllocatorAddVoter,
// AllocatorRemoveDeadVoter, AllocatorRemoveVoter below. The adjustment should
// be <49 with two assumptions below. New uses on this contract should be
// avoided since the assumptions are not strong guarantees (especially the
// second one).
//
// The claim that the adjustment is < 49 has two assumptions:
// 1. min(num_replicas,total_nodes) in zone configuration is < 98.
// 2. when ranges are not under-replicated, the difference between
// min(num_replicas,total_nodes)/2-1 and existing_replicas is < 49.
//
// neededVoters <= min(num_replicas,total_nodes)
// desiredQuorum = neededVoters/2-1
// quorum = haveVoters/2-1
//
// For AllocatorAddVoter, we know haveVoters < neededVoters
// adjustment = desiredQuorum-haveVoters = neededVoters/2-1-haveVoters
// To find the worst case (largest adjustment),
// 1. haveVoters = neededVoters-1,
// adjustment = neededVoters/2-1-(neededVoters-1)
// = neededVoters/2-neededVoters = -neededVoters/2
// 2. haveVoters = 0
// adjustement = neededVoters/2-1
//
// In order for adjustment to be <49, neededVoters/2<49 => neededVoters<98.
// Hence the first assumption.
//
// For AllocatorRemoveDeadVoter, we know haveVoters >= neededVoters
// adjustment = desiredQuorum-haveVoters = neededVoters/2-1-haveVoters
// To find the worst case (largest adjustment),
// 1. neededVoters/2-1 is much larger than haveVoters: given haveVoters >=
// neededVoters, haveVoters/2-1 >= neededVoters/2-1. So this case is impossible.
// 2. neededVoters/2-1 is much smaller than haveVoters: since ranges could be
// over-replicated, theoretically speaking, there may be no upper bounds on
// haveVoters. In order for adjustment to be < 49, we can only make an
// assumption here that the difference between neededVoters/2-1 and haveVoters
// cannot be >= 49 in this case.
//
// For AllocatorRemoveVoter, adjustment is haveVoters%2 = 0 or 1 < 49.
func (a *Allocator) computeAction(
ctx context.Context,
storePool storepool.AllocatorStorePool,
Expand Down Expand Up @@ -3245,3 +3306,65 @@ func replDescsToStoreIDs(descs []roachpb.ReplicaDescriptor) []roachpb.StoreID {
}
return ret
}

// roundToNearestPriorityCategory rounds a priority to the nearest 100. n should
// be non-negative.
func roundToNearestPriorityCategory(n float64) float64 {
return math.Round(n/100.0) * 100
}

// CheckPriorityInversion returns whether there was a priority inversion (and
// the range should not be processed at this time, since doing so could starve
// higher-priority items), and whether the caller should re-add the range to the
// queue (presumably under its new priority). A priority inversion happens if
// the priority at enqueue time is higher than the priority corresponding to the
// action computed at processing time. Caller should re-add the range to the
// queue if it has gone from a repair action to lowest priority
// (AllocatorConsiderRebalance).
//
// Note: Changing from AllocatorRangeUnavailable/AllocatorNoop to
// AllocatorConsiderRebalance is not treated as a priority inversion. Going from
// a repair action to AllocatorRangeUnavailable/AllocatorNoop is considered a
// priority inversion but shouldRequeue = false.
//
// INVARIANT: shouldRequeue => isInversion
func CheckPriorityInversion(
priorityAtEnqueue float64, actionAtProcessing AllocatorAction,
) (isInversion bool, shouldRequeue bool) {
// NB: priorityAtEnqueue is -1 for callers such as scatter, dry runs, and
// manual queue runs. Priority inversion does not apply to these calls.
if priorityAtEnqueue == -1 {
return false, false
}

// NB: we need to check for when priorityAtEnqueue falls within the range
// of the allocator actions because store.Enqueue might enqueue things with
// a very high priority (1e5). In those cases, we do not want to requeue
// these actions or count it as an inversion.
withinPriorityRange := func(priority float64) bool {
return AllocatorNoop.Priority() <= priority && priority <= AllocatorMaxPriority.Priority()
}
if !withinPriorityRange(priorityAtEnqueue) {
return false, false
}

if priorityAtEnqueue > AllocatorConsiderRebalance.Priority() && actionAtProcessing == AllocatorConsiderRebalance {
return true, true
}

// NB: Usually, the priority at enqueue time should correspond to
// action.Priority(). However, for AllocatorAddVoter,
// AllocatorRemoveDeadVoter, AllocatorRemoveVoter, the priority can be
// adjusted at enqueue time (See ComputeAction for more details). However, we
// expect the adjustment to be relatively small (<49). So we round the
// priority to the nearest 100 to compare against
// actionAtProcessing.Priority(). Without this rounding, we might treat going
// from 10000 to 999 as an inversion, but it was just due to the adjustment.
// Note that priorities at AllocatorFinalizeAtomicReplicationChange,
// AllocatorRemoveLearner, and AllocatorReplaceDeadVoter will be rounded to
// the same priority. They are so close to each other, so we don't really
// count it as an inversion among them.
normPriorityAtEnqueue := roundToNearestPriorityCategory(priorityAtEnqueue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the priority enum with a hard-to-miss message that priorities should be "separated" by multiples of 100 (explaining why) and also update the places where we adjust the priority to explain why each adjustment is ok?

Copy link
Contributor Author

@wenyihu6 wenyihu6 Aug 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a NB comment and a regression unit test. I noticed that we are making some assumptions with this statement, but I think the assumptions should be true. (Update: After some thinking, I don't know if I can say the second assumption should be true. We can revisit them.)

Copy link
Contributor Author

@wenyihu6 wenyihu6 Aug 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: After some thinking, I don't know if I can say the assumptions (especially the second one) should be true. I should revisit them if strong guarantees are required. It should be fine for now since priority inversion re-queuing only care about repair action v.s. consider rebalance. I added a comment to warn new uses.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're saying that with sufficiently high replication factors, some of the priorities may be adjusted by more than 50, right? I can live with this, add a comment about it and let's move on.

isInversion = normPriorityAtEnqueue > actionAtProcessing.Priority()
return isInversion, false
}
178 changes: 178 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9629,3 +9629,181 @@ func TestAllocatorRebalanceTargetVoterConstraintUnsatisfied(t *testing.T) {
})
}
}

// TestRoundToNearestPriorityCategory tests the RoundToNearestPriorityCategory
// function.
func TestRoundToNearestPriorityCategory(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
name string
input float64
expected float64
}{
{
name: "zero",
input: 0.0,
expected: 0.0,
},
{
name: "exact multiple of 100",
input: 100.0,
expected: 100.0,
},
{
name: "round down to nearest 100",
input: 149.0,
expected: 100.0,
},
{
name: "round up to nearest 100",
input: 151.0,
expected: 200.0,
},
{
name: "negative exact multiple of 100",
input: -200.0,
expected: -200.0,
},
{
name: "negative round down to nearest 100",
input: -249.0,
expected: -200.0,
},
{
name: "negative round up to nearest 100",
input: -251.0,
expected: -300.0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expected, roundToNearestPriorityCategory(tc.input))
})
}
}

// TestCheckPriorityInversion tests the CheckPriorityInversion function.
func TestCheckPriorityInversion(t *testing.T) {
defer leaktest.AfterTest(t)()

for action := AllocatorNoop; action <= AllocatorFinalizeAtomicReplicationChange; action++ {
t.Run(action.String(), func(t *testing.T) {
if action == AllocatorConsiderRebalance || action == AllocatorNoop || action == AllocatorRangeUnavailable {
inversion, requeue := CheckPriorityInversion(action.Priority(), AllocatorConsiderRebalance)
require.False(t, inversion)
require.False(t, requeue)
} else {
inversion, requeue := CheckPriorityInversion(action.Priority(), AllocatorConsiderRebalance)
require.True(t, inversion)
require.True(t, requeue)
}
})
}

testCases := []struct {
name string
priorityAtEnqueue float64
actionAtProcessing AllocatorAction
expectedInversion bool
expectedRequeue bool
}{
{
name: "AllocatorNoop at processing is noop",
priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
actionAtProcessing: AllocatorNoop,
expectedInversion: true,
expectedRequeue: false,
},
{
name: "AllocatorRangeUnavailable at processing is noop",
priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
actionAtProcessing: AllocatorRangeUnavailable,
expectedInversion: true,
expectedRequeue: false,
},
{
name: "priority -1 bypasses",
priorityAtEnqueue: -1,
actionAtProcessing: AllocatorConsiderRebalance,
expectedInversion: false,
expectedRequeue: false,
},
{
name: "priority increase",
priorityAtEnqueue: 0,
actionAtProcessing: AllocatorFinalizeAtomicReplicationChange,
expectedInversion: false,
expectedRequeue: false,
},
{
name: "above range priority(1e5)",
priorityAtEnqueue: 1e5,
actionAtProcessing: AllocatorConsiderRebalance,
expectedInversion: false,
expectedRequeue: false,
},
{
name: "below range priority at -10",
priorityAtEnqueue: -10,
actionAtProcessing: -100,
expectedInversion: false,
expectedRequeue: false,
},
{
name: "inversion but small priority changes",
priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
actionAtProcessing: AllocatorReplaceDecommissioningNonVoter,
expectedInversion: true,
expectedRequeue: false,
},
{
name: "inversion but small priority changes",
priorityAtEnqueue: AllocatorRemoveDeadVoter.Priority(),
actionAtProcessing: AllocatorAddNonVoter,
expectedInversion: true,
expectedRequeue: false,
},
{
name: "inversion but small priority changes",
priorityAtEnqueue: AllocatorConsiderRebalance.Priority(),
actionAtProcessing: AllocatorNoop,
expectedInversion: false,
expectedRequeue: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
inversion, requeue := CheckPriorityInversion(tc.priorityAtEnqueue, tc.actionAtProcessing)
require.Equal(t, tc.expectedInversion, inversion)
require.Equal(t, tc.expectedRequeue, requeue)
})
}
}

// TestAllocatorPriorityInvariance verifies that allocator priorities remain
// spaced in multiples of 100. This prevents regressions against the contract
// relied on by CheckPriorityInversion. For details, see the comment above
// action.Priority().
func TestAllocatorPriorityInvariance(t *testing.T) {
defer leaktest.AfterTest(t)()

exceptions := map[AllocatorAction]struct{}{
AllocatorFinalizeAtomicReplicationChange: {},
AllocatorRemoveLearner: {},
AllocatorReplaceDeadVoter: {},
}
lowestPriority := AllocatorNoop.Priority()
for action := AllocatorNoop; action < AllocatorMaxPriority; action++ {
require.GreaterOrEqualf(t, action.Priority(), lowestPriority,
"priority %f is less than AllocatorNoop: likely violating contract",
action.Priority())
if _, ok := exceptions[action]; !ok {
require.Equalf(t, int(action.Priority())%100, 0,
"priority %f is not a multiple of 100: likely violating contract",
action.Priority())

}
}
}
Loading