Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions docs/generated/metrics/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13919,6 +13919,134 @@ layers:
unit: COUNT
aggregation: AVG
derivative: NONE
- name: queue.replicate.priority_inversion.addnonvoter
exported_name: queue_replicate_priority_inversion_addnonvoter
description: Number of priority inversions in the replicate queue that resulted in add non-voter action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.addvoter
exported_name: queue_replicate_priority_inversion_addvoter
description: Number of priority inversions in the replicate queue that resulted in add voter action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.considerrebalance
exported_name: queue_replicate_priority_inversion_considerrebalance
description: Number of priority inversions in the replicate queue that resulted in consider rebalance action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.noop
exported_name: queue_replicate_priority_inversion_noop
description: Number of priority inversions in the replicate queue that resulted in noop action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.rangeunavailable
exported_name: queue_replicate_priority_inversion_rangeunavailable
description: Number of priority inversions in the replicate queue that resulted in range unavailable action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.removedeadnonvoter
exported_name: queue_replicate_priority_inversion_removedeadnonvoter
description: Number of priority inversions in the replicate queue that resulted in remove dead non-voter action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.removedeadvoter
exported_name: queue_replicate_priority_inversion_removedeadvoter
description: Number of priority inversions in the replicate queue that resulted in remove dead voter action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.removedecommissioningnonvoter
exported_name: queue_replicate_priority_inversion_removedecommissioningnonvoter
description: Number of priority inversions in the replicate queue that resulted in remove decommissioning non-voter action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.removedecommissioningvoter
exported_name: queue_replicate_priority_inversion_removedecommissioningvoter
description: Number of priority inversions in the replicate queue that resulted in remove decommissioning voter action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.removenonvoter
exported_name: queue_replicate_priority_inversion_removenonvoter
description: Number of priority inversions in the replicate queue that resulted in remove non-voter action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.removevoter
exported_name: queue_replicate_priority_inversion_removevoter
description: Number of priority inversions in the replicate queue that resulted in remove voter action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.replacedeadnonvoter
exported_name: queue_replicate_priority_inversion_replacedeadnonvoter
description: Number of priority inversions in the replicate queue that resulted in replace dead non-voter action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.replacedecommissioningnonvoter
exported_name: queue_replicate_priority_inversion_replacedecommissioningnonvoter
description: Number of priority inversions in the replicate queue that resulted in replace decommissioning non-voter action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.replacedecommissioningvoter
exported_name: queue_replicate_priority_inversion_replacedecommissioningvoter
description: Number of priority inversions in the replicate queue that resulted in replace decommissioning voter action during processing
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.requeue
exported_name: queue_replicate_priority_inversion_requeue
description: Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time. When the priority has changed from a high priority repair action to rebalance, the change is requeued to avoid unfairness.
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.priority_inversion.total
exported_name: queue_replicate_priority_inversion_total
description: Total number of priority inversions in the replicate queue. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.process.failure
exported_name: queue_replicate_process_failure
description: Number of replicas which failed processing in the replicate queue
Expand Down
50 changes: 50 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3245,3 +3245,53 @@ func replDescsToStoreIDs(descs []roachpb.ReplicaDescriptor) []roachpb.StoreID {
}
return ret
}

// roundToNearestPriorityCategory rounds a priority to the nearest 100. n should be non-negative.
func roundToNearestPriorityCategory(n float64) float64 {
return math.Round(n/100.0) * 100
}

// WithinPriorityRange checks if a priority is within the range of possible
// priorities for the allocator actions.
func withinPriorityRange(priority float64) bool {
return AllocatorNoop.Priority() <= priority && priority <= AllocatorFinalizeAtomicReplicationChange.Priority()
}

// CheckPriorityInversion checks if the priority at enqueue time is higher than
// the priority corresponding to the action computed at processing time. It
// returns whether there was a priority inversion and whether the caller should
// skip the processing of the range since the inversion is considered unfair.
// Currently, we only consider the inversion as unfair if it has went from a
// repair action to lowest priority (AllocatorConsiderRebalance). We let
// AllocatorRangeUnavailable, AllocatorNoop pass through since they are noop.
func CheckPriorityInversion(
priorityAtEnqueue float64, actionAtProcessing AllocatorAction,
) (isInversion bool, shouldRequeue bool) {
// NB: we need to check for when priorityAtEnqueue falls within the range
// of the allocator actions because store.Enqueue might enqueue things with
// a very high priority (1e5). In those cases, we do not want to requeue
// these actions or count it as an inversion.
if priorityAtEnqueue == -1 || !withinPriorityRange(priorityAtEnqueue) {
return false, false
}

if priorityAtEnqueue > AllocatorConsiderRebalance.Priority() && actionAtProcessing == AllocatorConsiderRebalance {
return true, true
}

// NB: Usually, the priority at enqueue time should correspond to
// action.Priority(). However, for AllocatorAddVoter,
// AllocatorRemoveDeadVoter, AllocatorRemoveVoter, the priority can be
// adjusted at enqueue time (See ComputeAction for more details). However, we
// expect the adjustment to be relatively small (<100). So we round the
// priority to the nearest 100 to compare against
// actionAtProcessing.Priority(). Without this rounding, we might treat going
// from 10000 to 999 as an inversion, but it was just due to the adjustment.
// Note that priorities at AllocatorFinalizeAtomicReplicationChange,
// AllocatorRemoveLearner, and AllocatorReplaceDeadVoter will be rounded to
// the same priority. They are so close to each other, so we don't really
// count it as an inversion among them.
normPriorityAtEnqueue := roundToNearestPriorityCategory(priorityAtEnqueue)
isInversion = normPriorityAtEnqueue > actionAtProcessing.Priority()
return isInversion, false
}
145 changes: 145 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9629,3 +9629,148 @@ func TestAllocatorRebalanceTargetVoterConstraintUnsatisfied(t *testing.T) {
})
}
}

// TestRoundToNearestPriorityCategory tests the RoundToNearestPriorityCategory
// function.
func TestRoundToNearestPriorityCategory(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
name string
input float64
expected float64
}{
{
name: "zero",
input: 0.0,
expected: 0.0,
},
{
name: "exact multiple of 100",
input: 100.0,
expected: 100.0,
},
{
name: "round down to nearest 100",
input: 149.0,
expected: 100.0,
},
{
name: "round up to nearest 100",
input: 151.0,
expected: 200.0,
},
{
name: "negative exact multiple of 100",
input: -200.0,
expected: -200.0,
},
{
name: "negative round down to nearest 100",
input: -249.0,
expected: -200.0,
},
{
name: "negative round up to nearest 100",
input: -251.0,
expected: -300.0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expected, roundToNearestPriorityCategory(tc.input))
})
}
}

// TestCheckPriorityInversion tests the CheckPriorityInversion function.
func TestCheckPriorityInversion(t *testing.T) {
defer leaktest.AfterTest(t)()

for action := AllocatorNoop; action <= AllocatorFinalizeAtomicReplicationChange; action++ {
t.Run(action.String(), func(t *testing.T) {
if action == AllocatorConsiderRebalance || action == AllocatorNoop || action == AllocatorRangeUnavailable {
inversion, requeue := CheckPriorityInversion(action.Priority(), AllocatorConsiderRebalance)
require.False(t, inversion)
require.False(t, requeue)
} else {
inversion, requeue := CheckPriorityInversion(action.Priority(), AllocatorConsiderRebalance)
require.True(t, inversion)
require.True(t, requeue)
}
})
}

testCases := []struct {
name string
priorityAtEnqueue float64
actionAtProcessing AllocatorAction
expectedInversion bool
expectedRequeue bool
}{
{
name: "AllocatorNoop at processing is noop",
priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
actionAtProcessing: AllocatorNoop,
expectedInversion: true,
expectedRequeue: false,
},
{
name: "AllocatorRangeUnavailable at processing is noop",
priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
actionAtProcessing: AllocatorRangeUnavailable,
expectedInversion: true,
expectedRequeue: false,
},
{
name: "priority -1 bypasses",
priorityAtEnqueue: -1,
actionAtProcessing: AllocatorConsiderRebalance,
expectedInversion: false,
expectedRequeue: false,
},
{
name: "above range priority(1e5)",
priorityAtEnqueue: 1e5,
actionAtProcessing: AllocatorConsiderRebalance,
expectedInversion: false,
expectedRequeue: false,
},
{
name: "below range priority at -10",
priorityAtEnqueue: -10,
actionAtProcessing: -100,
expectedInversion: false,
expectedRequeue: false,
},
{
name: "inversion but small priority changes",
priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
actionAtProcessing: AllocatorReplaceDecommissioningNonVoter,
expectedInversion: true,
expectedRequeue: false,
},
{
name: "inversion but small priority changes",
priorityAtEnqueue: AllocatorRemoveDeadVoter.Priority(),
actionAtProcessing: AllocatorAddNonVoter,
expectedInversion: true,
expectedRequeue: false,
},
{
name: "inversion but small priority changes",
priorityAtEnqueue: AllocatorConsiderRebalance.Priority(),
actionAtProcessing: AllocatorNoop,
expectedInversion: false,
expectedRequeue: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
inversion, requeue := CheckPriorityInversion(tc.priorityAtEnqueue, tc.actionAtProcessing)
require.Equal(t, tc.expectedInversion, inversion)
require.Equal(t, tc.expectedRequeue, requeue)
})
}
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func newConsistencyQueue(store *Store) *consistencyQueue {
q.baseQueue = newBaseQueue(
"consistencyChecker", q, store,
queueConfig{
maxSize: defaultQueueMaxSize,
maxSize: kvserverbase.BaseQueueMaxSize,
needsLease: true,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvserverbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_pkg_errors//:errors",
"@org_golang_x_time//rate",
],
)
Loading
Loading