From fc6018901829019c4dd9dce14b2bca484066ccb7 Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Tue, 26 Aug 2025 22:59:42 -0400 Subject: [PATCH 1/8] kvserver: add PriorityInversionRequeue This commit adds a new cluster setting PriorityInversionRequeue that controls whether the replicate queue should requeue replicas when their priority at enqueue time differs significantly from their priority at processing time (e.g. dropping from top 3 to the lowest priority). --- pkg/kv/kvserver/replicate_queue.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 0de70b4d3d46..fc35ebae49aa 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -100,6 +100,19 @@ var EnqueueProblemRangeInReplicateQueueInterval = settings.RegisterDurationSetti 0, ) +// TODO(wenyihu6): move these cluster settings to kvserverbase + +// PriorityInversionRequeue is a setting that controls whether to requeue +// replicas when their priority at enqueue time and processing time is inverted +// too much (e.g. dropping from a repair cation to AllocatorConsiderRebalance). +var PriorityInversionRequeue = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.priority_inversion_requeue_replicate_queue.enabled", + "whether the requeue replicas should requeue when enqueued for "+ + "repair action but ended up consider rebalancing during processing", + false, +) + var ( metaReplicateQueueAddReplicaCount = metric.Metadata{ Name: "queue.replicate.addreplica", From e453962f6998dc2e0b7c6274c6e63e10e294d289 Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Tue, 26 Aug 2025 23:11:05 -0400 Subject: [PATCH 2/8] kvserver: requeue on priority inversion for replicate queue Previously, a replica could enter the queue with high priority but, by the time it was processed, the action planned for this replica may have a low priority, causing us to perform low priority work. Specifically, we are mostly worried about cases where the priority changes from any of the repair actions to consider rebalance. Rebalancing could take a long time and block other ranges enqueued with actual repair action needed. This commit ensures that such replicas are requeued instead, avoiding priority inversions. --- .../allocator/allocatorimpl/allocator.go | 50 ++++++ .../allocator/allocatorimpl/allocator_test.go | 145 ++++++++++++++++++ pkg/kv/kvserver/replicate_queue.go | 22 +++ 3 files changed, 217 insertions(+) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index e48c57c39501..9632809728cd 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -3245,3 +3245,53 @@ func replDescsToStoreIDs(descs []roachpb.ReplicaDescriptor) []roachpb.StoreID { } return ret } + +// roundToNearestPriorityCategory rounds a priority to the nearest 100. n should be non-negative. +func roundToNearestPriorityCategory(n float64) float64 { + return math.Round(n/100.0) * 100 +} + +// WithinPriorityRange checks if a priority is within the range of possible +// priorities for the allocator actions. +func withinPriorityRange(priority float64) bool { + return AllocatorNoop.Priority() <= priority && priority <= AllocatorFinalizeAtomicReplicationChange.Priority() +} + +// CheckPriorityInversion checks if the priority at enqueue time is higher than +// the priority corresponding to the action computed at processing time. It +// returns whether there was a priority inversion and whether the caller should +// skip the processing of the range since the inversion is considered unfair. +// Currently, we only consider the inversion as unfair if it has went from a +// repair action to lowest priority (AllocatorConsiderRebalance). We let +// AllocatorRangeUnavailable, AllocatorNoop pass through since they are noop. +func CheckPriorityInversion( + priorityAtEnqueue float64, actionAtProcessing AllocatorAction, +) (isInversion bool, shouldRequeue bool) { + // NB: we need to check for when priorityAtEnqueue falls within the range + // of the allocator actions because store.Enqueue might enqueue things with + // a very high priority (1e5). In those cases, we do not want to requeue + // these actions or count it as an inversion. + if priorityAtEnqueue == -1 || !withinPriorityRange(priorityAtEnqueue) { + return false, false + } + + if priorityAtEnqueue > AllocatorConsiderRebalance.Priority() && actionAtProcessing == AllocatorConsiderRebalance { + return true, true + } + + // NB: Usually, the priority at enqueue time should correspond to + // action.Priority(). However, for AllocatorAddVoter, + // AllocatorRemoveDeadVoter, AllocatorRemoveVoter, the priority can be + // adjusted at enqueue time (See ComputeAction for more details). However, we + // expect the adjustment to be relatively small (<100). So we round the + // priority to the nearest 100 to compare against + // actionAtProcessing.Priority(). Without this rounding, we might treat going + // from 10000 to 999 as an inversion, but it was just due to the adjustment. + // Note that priorities at AllocatorFinalizeAtomicReplicationChange, + // AllocatorRemoveLearner, and AllocatorReplaceDeadVoter will be rounded to + // the same priority. They are so close to each other, so we don't really + // count it as an inversion among them. + normPriorityAtEnqueue := roundToNearestPriorityCategory(priorityAtEnqueue) + isInversion = normPriorityAtEnqueue > actionAtProcessing.Priority() + return isInversion, false +} diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 9e443e0edfbc..a67f0fb682ef 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -9629,3 +9629,148 @@ func TestAllocatorRebalanceTargetVoterConstraintUnsatisfied(t *testing.T) { }) } } + +// TestRoundToNearestPriorityCategory tests the RoundToNearestPriorityCategory +// function. +func TestRoundToNearestPriorityCategory(t *testing.T) { + defer leaktest.AfterTest(t)() + + testCases := []struct { + name string + input float64 + expected float64 + }{ + { + name: "zero", + input: 0.0, + expected: 0.0, + }, + { + name: "exact multiple of 100", + input: 100.0, + expected: 100.0, + }, + { + name: "round down to nearest 100", + input: 149.0, + expected: 100.0, + }, + { + name: "round up to nearest 100", + input: 151.0, + expected: 200.0, + }, + { + name: "negative exact multiple of 100", + input: -200.0, + expected: -200.0, + }, + { + name: "negative round down to nearest 100", + input: -249.0, + expected: -200.0, + }, + { + name: "negative round up to nearest 100", + input: -251.0, + expected: -300.0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expected, roundToNearestPriorityCategory(tc.input)) + }) + } +} + +// TestCheckPriorityInversion tests the CheckPriorityInversion function. +func TestCheckPriorityInversion(t *testing.T) { + defer leaktest.AfterTest(t)() + + for action := AllocatorNoop; action <= AllocatorFinalizeAtomicReplicationChange; action++ { + t.Run(action.String(), func(t *testing.T) { + if action == AllocatorConsiderRebalance || action == AllocatorNoop || action == AllocatorRangeUnavailable { + inversion, requeue := CheckPriorityInversion(action.Priority(), AllocatorConsiderRebalance) + require.False(t, inversion) + require.False(t, requeue) + } else { + inversion, requeue := CheckPriorityInversion(action.Priority(), AllocatorConsiderRebalance) + require.True(t, inversion) + require.True(t, requeue) + } + }) + } + + testCases := []struct { + name string + priorityAtEnqueue float64 + actionAtProcessing AllocatorAction + expectedInversion bool + expectedRequeue bool + }{ + { + name: "AllocatorNoop at processing is noop", + priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(), + actionAtProcessing: AllocatorNoop, + expectedInversion: true, + expectedRequeue: false, + }, + { + name: "AllocatorRangeUnavailable at processing is noop", + priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(), + actionAtProcessing: AllocatorRangeUnavailable, + expectedInversion: true, + expectedRequeue: false, + }, + { + name: "priority -1 bypasses", + priorityAtEnqueue: -1, + actionAtProcessing: AllocatorConsiderRebalance, + expectedInversion: false, + expectedRequeue: false, + }, + { + name: "above range priority(1e5)", + priorityAtEnqueue: 1e5, + actionAtProcessing: AllocatorConsiderRebalance, + expectedInversion: false, + expectedRequeue: false, + }, + { + name: "below range priority at -10", + priorityAtEnqueue: -10, + actionAtProcessing: -100, + expectedInversion: false, + expectedRequeue: false, + }, + { + name: "inversion but small priority changes", + priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(), + actionAtProcessing: AllocatorReplaceDecommissioningNonVoter, + expectedInversion: true, + expectedRequeue: false, + }, + { + name: "inversion but small priority changes", + priorityAtEnqueue: AllocatorRemoveDeadVoter.Priority(), + actionAtProcessing: AllocatorAddNonVoter, + expectedInversion: true, + expectedRequeue: false, + }, + { + name: "inversion but small priority changes", + priorityAtEnqueue: AllocatorConsiderRebalance.Priority(), + actionAtProcessing: AllocatorNoop, + expectedInversion: false, + expectedRequeue: false, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + inversion, requeue := CheckPriorityInversion(tc.priorityAtEnqueue, tc.actionAtProcessing) + require.Equal(t, tc.expectedInversion, inversion) + require.Equal(t, tc.expectedRequeue, requeue) + }) + } +} diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index fc35ebae49aa..b64b458ba157 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -696,6 +696,10 @@ func (rq *replicateQueue) process( } if err != nil { + if requeue { + log.KvDistribution.VEventf(ctx, 1, "re-queuing on errors: %v", err) + rq.maybeAdd(ctx, repl, rq.store.Clock().NowAsClockTimestamp()) + } return false, err } @@ -890,6 +894,24 @@ func (rq *replicateQueue) processOneChange( ) (requeue bool, _ error) { change, err := rq.planner.PlanOneChange( ctx, repl, desc, conf, plan.PlannerOptions{Scatter: scatter}) + + inversion, shouldRequeue := allocatorimpl.CheckPriorityInversion(priorityAtEnqueue, change.Action) + if inversion { + log.KvDistribution.Infof(ctx, + "priority inversion during process: shouldRequeue = %t action=%s, priority=%v, enqueuePriority=%v", + shouldRequeue, change.Action, change.Action.Priority(), priorityAtEnqueue) + } + if PriorityInversionRequeue.Get(&rq.store.cfg.Settings.SV) { + if shouldRequeue { + // Return true here to requeue the range. We can't return an error here + // because rq.process only requeue when error is nil. See + // replicateQueue.process for more details. + return true /*requeue*/, maybeAnnotateDecommissionErr( + errors.Errorf("requing due to priority inversion: action=%s, priority=%v, enqueuePriority=%v", + change.Action, change.Action.Priority(), priorityAtEnqueue), change.Action) + } + } + // When there is an error planning a change, return the error immediately // and do not requeue. It is unlikely that the range or storepool state // will change quickly enough in order to not get the same error and From baa3a95e52cfb0e632c50d42c10abbc80404d210 Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Tue, 26 Aug 2025 23:26:51 -0400 Subject: [PATCH 3/8] kvserver: track priority inversion in replicate queue metrics Previously, replicas could be enqueued at a high priority but end up processing a lower-priority actions, causing priority inversion and unfairness to other replicas behind them that needs a repair action. This commit adds metrics to track such cases. In addition, this commit also adds metrics to track when replicas are requeued in the replicate queue due to a priority inversion from a repair action to a rebalance action. --- docs/generated/metrics/metrics.yaml | 128 +++++++++++++++++++ pkg/kv/kvserver/replicate_queue.go | 182 ++++++++++++++++++++++++++++ 2 files changed, 310 insertions(+) diff --git a/docs/generated/metrics/metrics.yaml b/docs/generated/metrics/metrics.yaml index 599e99e367bf..b71219f8d3eb 100644 --- a/docs/generated/metrics/metrics.yaml +++ b/docs/generated/metrics/metrics.yaml @@ -13919,6 +13919,134 @@ layers: unit: COUNT aggregation: AVG derivative: NONE + - name: queue.replicate.priority_inversion.addnonvoter + exported_name: queue_replicate_priority_inversion_addnonvoter + description: Number of priority inversions in the replicate queue that resulted in add non-voter action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.addvoter + exported_name: queue_replicate_priority_inversion_addvoter + description: Number of priority inversions in the replicate queue that resulted in add voter action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.considerrebalance + exported_name: queue_replicate_priority_inversion_considerrebalance + description: Number of priority inversions in the replicate queue that resulted in consider rebalance action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.noop + exported_name: queue_replicate_priority_inversion_noop + description: Number of priority inversions in the replicate queue that resulted in noop action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.rangeunavailable + exported_name: queue_replicate_priority_inversion_rangeunavailable + description: Number of priority inversions in the replicate queue that resulted in range unavailable action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.removedeadnonvoter + exported_name: queue_replicate_priority_inversion_removedeadnonvoter + description: Number of priority inversions in the replicate queue that resulted in remove dead non-voter action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.removedeadvoter + exported_name: queue_replicate_priority_inversion_removedeadvoter + description: Number of priority inversions in the replicate queue that resulted in remove dead voter action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.removedecommissioningnonvoter + exported_name: queue_replicate_priority_inversion_removedecommissioningnonvoter + description: Number of priority inversions in the replicate queue that resulted in remove decommissioning non-voter action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.removedecommissioningvoter + exported_name: queue_replicate_priority_inversion_removedecommissioningvoter + description: Number of priority inversions in the replicate queue that resulted in remove decommissioning voter action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.removenonvoter + exported_name: queue_replicate_priority_inversion_removenonvoter + description: Number of priority inversions in the replicate queue that resulted in remove non-voter action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.removevoter + exported_name: queue_replicate_priority_inversion_removevoter + description: Number of priority inversions in the replicate queue that resulted in remove voter action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.replacedeadnonvoter + exported_name: queue_replicate_priority_inversion_replacedeadnonvoter + description: Number of priority inversions in the replicate queue that resulted in replace dead non-voter action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.replacedecommissioningnonvoter + exported_name: queue_replicate_priority_inversion_replacedecommissioningnonvoter + description: Number of priority inversions in the replicate queue that resulted in replace decommissioning non-voter action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.replacedecommissioningvoter + exported_name: queue_replicate_priority_inversion_replacedecommissioningvoter + description: Number of priority inversions in the replicate queue that resulted in replace decommissioning voter action during processing + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.requeue + exported_name: queue_replicate_priority_inversion_requeue + description: Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time. When the priority has changed from a high priority repair action to rebalance, the change is requeued to avoid unfairness. + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.priority_inversion.total + exported_name: queue_replicate_priority_inversion_total + description: Total number of priority inversions in the replicate queue. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE - name: queue.replicate.process.failure exported_name: queue_replicate_process_failure description: Number of replicas which failed processing in the replicate queue diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index b64b458ba157..46e4536843f4 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -303,6 +303,106 @@ var ( Measurement: "Replicas", Unit: metric.Unit_COUNT, } + metaReplicateQueueRequeueDueToPriorityInversion = metric.Metadata{ + Name: "queue.replicate.priority_inversion.requeue", + Help: "Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. " + + "A priority inversion occurs when the priority at processing time ends up being lower " + + "than at enqueue time. When the priority has changed from a high priority repair action to rebalance, " + + "the change is requeued to avoid unfairness.", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionTotal = metric.Metadata{ + Name: "queue.replicate.priority_inversion.total", + Help: "Total number of priority inversions in the replicate queue. " + + "A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForAddVoterCount = metric.Metadata{ + Name: "queue.replicate.priority_inversion.addvoter", + Help: "Number of priority inversions in the replicate queue that resulted in add voter action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForReplaceDecommissioningVoterCount = metric.Metadata{ + Name: "queue.replicate.priority_inversion.replacedecommissioningvoter", + Help: "Number of priority inversions in the replicate queue that resulted in replace decommissioning voter action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForRemoveDeadVoterCount = metric.Metadata{ + Name: "queue.replicate.priority_inversion.removedeadvoter", + Help: "Number of priority inversions in the replicate queue that resulted in remove dead voter action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForRemoveDecommissioningVoterCount = metric.Metadata{ + Name: "queue.replicate.priority_inversion.removedecommissioningvoter", + Help: "Number of priority inversions in the replicate queue that resulted in remove decommissioning voter action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForRemoveVoterCount = metric.Metadata{ + Name: "queue.replicate.priority_inversion.removevoter", + Help: "Number of priority inversions in the replicate queue that resulted in remove voter action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForReplaceDeadNonVoterCount = metric.Metadata{ + Name: "queue.replicate.priority_inversion.replacedeadnonvoter", + Help: "Number of priority inversions in the replicate queue that resulted in replace dead non-voter action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForAddNonVoterCount = metric.Metadata{ + Name: "queue.replicate.priority_inversion.addnonvoter", + Help: "Number of priority inversions in the replicate queue that resulted in add non-voter action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForReplaceDecommissioningNonVoterCount = metric.Metadata{ + Name: "queue.replicate.priority_inversion.replacedecommissioningnonvoter", + Help: "Number of priority inversions in the replicate queue that resulted in replace decommissioning non-voter action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForRemoveDeadNonVoterCount = metric.Metadata{ + Name: "queue.replicate.priority_inversion.removedeadnonvoter", + Help: "Number of priority inversions in the replicate queue that resulted in remove dead non-voter action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForRemoveDecommissioningNonVoterCount = metric.Metadata{ + Name: "queue.replicate.priority_inversion.removedecommissioningnonvoter", + Help: "Number of priority inversions in the replicate queue that resulted in remove decommissioning non-voter action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForRemoveNonVoterCount = metric.Metadata{ + Name: "queue.replicate.priority_inversion.removenonvoter", + Help: "Number of priority inversions in the replicate queue that resulted in remove non-voter action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForConsiderRebalance = metric.Metadata{ + Name: "queue.replicate.priority_inversion.considerrebalance", + Help: "Number of priority inversions in the replicate queue that resulted in consider rebalance action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForRangeUnavailable = metric.Metadata{ + Name: "queue.replicate.priority_inversion.rangeunavailable", + Help: "Number of priority inversions in the replicate queue that resulted in range unavailable action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueuePriorityInversionForNoop = metric.Metadata{ + Name: "queue.replicate.priority_inversion.noop", + Help: "Number of priority inversions in the replicate queue that resulted in noop action during processing", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } ) // quorumError indicates a retryable error condition which sends replicas being @@ -325,6 +425,7 @@ func (e *quorumError) Error() string { func (*quorumError) PurgatoryErrorMarker() {} // ReplicateQueueMetrics is the set of metrics for the replicate queue. +// TODO(wenyihu6): metrics initialization could be cleaned up by using a map. type ReplicateQueueMetrics struct { AddReplicaCount *metric.Counter AddVoterReplicaCount *metric.Counter @@ -362,6 +463,27 @@ type ReplicateQueueMetrics struct { // TODO(sarkesian): Consider adding metrics for AllocatorRemoveLearner, // AllocatorConsiderRebalance, and AllocatorFinalizeAtomicReplicationChange // allocator actions. + + // Priority Inversion. Not tracked for + // AllocatorFinalizeAtomicReplicationChange, AllocatorRemoveLearner, + // AllocatorReplaceDeadVoter since they are the highest priority actions and + // cannot be inverted. (17 total actions-3=14) + RequeueDueToPriorityInversion *metric.Counter + PriorityInversionTotal *metric.Counter + PriorityInversionForAddVoterCount *metric.Counter + PriorityInversionForReplaceDecommissioningVoterCount *metric.Counter + PriorityInversionForRemoveDeadVoterCount *metric.Counter + PriorityInversionForRemoveDecommissioningVoterCount *metric.Counter + PriorityInversionForRemoveVoterCount *metric.Counter + PriorityInversionForReplaceDeadNonVoterCount *metric.Counter + PriorityInversionForAddNonVoterCount *metric.Counter + PriorityInversionForReplaceDecommissioningNonVoterCount *metric.Counter + PriorityInversionForRemoveDeadNonVoterCount *metric.Counter + PriorityInversionForRemoveDecommissioningNonVoterCount *metric.Counter + PriorityInversionForRemoveNonVoterCount *metric.Counter + PriorityInversionForConsiderRebalance *metric.Counter + PriorityInversionForRangeUnavailable *metric.Counter + PriorityInversionForNoop *metric.Counter } func makeReplicateQueueMetrics() ReplicateQueueMetrics { @@ -398,6 +520,23 @@ func makeReplicateQueueMetrics() ReplicateQueueMetrics { ReplaceDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueReplaceDecommissioningReplicaErrorCount), RemoveDecommissioningReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaSuccessCount), RemoveDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaErrorCount), + + RequeueDueToPriorityInversion: metric.NewCounter(metaReplicateQueueRequeueDueToPriorityInversion), + PriorityInversionTotal: metric.NewCounter(metaReplicateQueuePriorityInversionTotal), + PriorityInversionForAddVoterCount: metric.NewCounter(metaReplicateQueuePriorityInversionForAddVoterCount), + PriorityInversionForReplaceDecommissioningVoterCount: metric.NewCounter(metaReplicateQueuePriorityInversionForReplaceDecommissioningVoterCount), + PriorityInversionForRemoveDeadVoterCount: metric.NewCounter(metaReplicateQueuePriorityInversionForRemoveDeadVoterCount), + PriorityInversionForRemoveDecommissioningVoterCount: metric.NewCounter(metaReplicateQueuePriorityInversionForRemoveDecommissioningVoterCount), + PriorityInversionForRemoveVoterCount: metric.NewCounter(metaReplicateQueuePriorityInversionForRemoveVoterCount), + PriorityInversionForReplaceDeadNonVoterCount: metric.NewCounter(metaReplicateQueuePriorityInversionForReplaceDeadNonVoterCount), + PriorityInversionForAddNonVoterCount: metric.NewCounter(metaReplicateQueuePriorityInversionForAddNonVoterCount), + PriorityInversionForReplaceDecommissioningNonVoterCount: metric.NewCounter(metaReplicateQueuePriorityInversionForReplaceDecommissioningNonVoterCount), + PriorityInversionForRemoveDeadNonVoterCount: metric.NewCounter(metaReplicateQueuePriorityInversionForRemoveDeadNonVoterCount), + PriorityInversionForRemoveDecommissioningNonVoterCount: metric.NewCounter(metaReplicateQueuePriorityInversionForRemoveDecommissioningNonVoterCount), + PriorityInversionForRemoveNonVoterCount: metric.NewCounter(metaReplicateQueuePriorityInversionForRemoveNonVoterCount), + PriorityInversionForConsiderRebalance: metric.NewCounter(metaReplicateQueuePriorityInversionForConsiderRebalance), + PriorityInversionForRangeUnavailable: metric.NewCounter(metaReplicateQueuePriorityInversionForRangeUnavailable), + PriorityInversionForNoop: metric.NewCounter(metaReplicateQueuePriorityInversionForNoop), } } @@ -521,6 +660,47 @@ func (metrics *ReplicateQueueMetrics) trackErrorByAllocatorAction( } +// trackPriorityInversion tracks the action that the replicate queue ended up +// processing when the priority at enqueue time was higher than the priority at +// processing time. +func (metrics *ReplicateQueueMetrics) trackPriorityInversion( + actionAtProcessingTime allocatorimpl.AllocatorAction, +) { + metrics.PriorityInversionTotal.Inc(1) + switch actionAtProcessingTime { + case allocatorimpl.AllocatorAddVoter: + metrics.PriorityInversionForAddVoterCount.Inc(1) + case allocatorimpl.AllocatorReplaceDecommissioningVoter: + metrics.PriorityInversionForReplaceDecommissioningVoterCount.Inc(1) + case allocatorimpl.AllocatorRemoveDeadVoter: + metrics.PriorityInversionForRemoveDeadVoterCount.Inc(1) + case allocatorimpl.AllocatorRemoveDecommissioningVoter: + metrics.PriorityInversionForRemoveDecommissioningVoterCount.Inc(1) + case allocatorimpl.AllocatorRemoveVoter: + metrics.PriorityInversionForRemoveVoterCount.Inc(1) + case allocatorimpl.AllocatorReplaceDeadNonVoter: + metrics.PriorityInversionForReplaceDeadNonVoterCount.Inc(1) + case allocatorimpl.AllocatorAddNonVoter: + metrics.PriorityInversionForAddNonVoterCount.Inc(1) + case allocatorimpl.AllocatorReplaceDecommissioningNonVoter: + metrics.PriorityInversionForReplaceDecommissioningNonVoterCount.Inc(1) + case allocatorimpl.AllocatorRemoveDeadNonVoter: + metrics.PriorityInversionForRemoveDeadNonVoterCount.Inc(1) + case allocatorimpl.AllocatorRemoveDecommissioningNonVoter: + metrics.PriorityInversionForRemoveDecommissioningNonVoterCount.Inc(1) + case allocatorimpl.AllocatorRemoveNonVoter: + metrics.PriorityInversionForRemoveNonVoterCount.Inc(1) + case allocatorimpl.AllocatorConsiderRebalance: + metrics.PriorityInversionForConsiderRebalance.Inc(1) + case allocatorimpl.AllocatorRangeUnavailable: + metrics.PriorityInversionForRangeUnavailable.Inc(1) + case allocatorimpl.AllocatorNoop: + metrics.PriorityInversionForNoop.Inc(1) + default: + panic("unhandled default case") + } +} + // trackProcessResult increases the corresponding success/error count metric for // processing a particular allocator action through the replicate queue. func (metrics *ReplicateQueueMetrics) trackResultByAllocatorAction( @@ -897,12 +1077,14 @@ func (rq *replicateQueue) processOneChange( inversion, shouldRequeue := allocatorimpl.CheckPriorityInversion(priorityAtEnqueue, change.Action) if inversion { + rq.metrics.trackPriorityInversion(change.Action) log.KvDistribution.Infof(ctx, "priority inversion during process: shouldRequeue = %t action=%s, priority=%v, enqueuePriority=%v", shouldRequeue, change.Action, change.Action.Priority(), priorityAtEnqueue) } if PriorityInversionRequeue.Get(&rq.store.cfg.Settings.SV) { if shouldRequeue { + rq.metrics.RequeueDueToPriorityInversion.Inc(1) // Return true here to requeue the range. We can't return an error here // because rq.process only requeue when error is nil. See // replicateQueue.process for more details. From a797d85168b2a635268e65622009442b0d7dacf8 Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Wed, 27 Aug 2025 21:21:54 -0400 Subject: [PATCH 4/8] tracing --- pkg/kv/kvserver/metrics.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 9c7780de8837..37a78e903052 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -3182,6 +3182,9 @@ type StoreMetrics struct { ReplicaGCQueueFailures *metric.Counter ReplicaGCQueuePending *metric.Gauge ReplicaGCQueueProcessingNanos *metric.Counter + ReplicateQueueEnqueueFailure *metric.Counter + ReplicateQueueEnqueueSuccess *metric.Counter + ReplicateQueueEnqueueDropped *metric.Counter ReplicateQueueSuccesses *metric.Counter ReplicateQueueFailures *metric.Counter ReplicateQueuePending *metric.Gauge From 207ebc429341145f2534e9572139d0fb2be596c7 Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Thu, 28 Aug 2025 10:57:34 -0400 Subject: [PATCH 5/8] a --- pkg/kv/kvserver/queue.go | 10 +++++++--- pkg/kv/kvserver/replicate_queue.go | 15 ++++++++++++++- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index 066f4cb39d4a..c3cde1c96464 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -331,6 +331,7 @@ type queueConfig struct { failures *metric.Counter // pending is a gauge measuring current replica count pending. pending *metric.Gauge + // // processingNanos is a counter measuring total nanoseconds spent processing // replicas. processingNanos *metric.Counter @@ -773,6 +774,7 @@ func (bq *baseQueue) addInternal( // priority element, but it would require additional bookkeeping or a linear // scan. if pqLen := bq.mu.priorityQ.Len(); pqLen > bq.maxSize { + bq.failures replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1] log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v", priority, replicaItemToDrop.replicaID) @@ -918,7 +920,7 @@ func (bq *baseQueue) processOneAsyncAndReleaseSem( defer func() { <-bq.processSem }() start := timeutil.Now() err := bq.processReplica(ctx, repl, priorityAtEnqueue) - bq.recordProcessDuration(ctx, timeutil.Since(start)) + bq.recordProcessDuration(ctx, timeutil.Since(start), priorityAtEnqueue) bq.finishProcessingReplica(ctx, stopper, repl, err) }); err != nil { // Release semaphore if we can't start the task, normally this only @@ -936,9 +938,11 @@ func (bq *baseQueue) lastProcessDuration() time.Duration { } // recordProcessDuration records the duration of a processing run. -func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duration) { +func (bq *baseQueue) recordProcessDuration( + ctx context.Context, dur time.Duration, priorityAtEnqueue float64, +) { if log.V(2) { - log.Dev.Infof(ctx, "done %s", dur) + log.Dev.Infof(ctx, "done=%s, priority(enqueue)=%.2f", dur, priorityAtEnqueue) } bq.processingNanos.Inc(dur.Nanoseconds()) atomic.StoreInt64(&bq.processDur, int64(dur)) diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 46e4536843f4..5fc31ab090fb 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -113,6 +113,19 @@ var PriorityInversionRequeue = settings.RegisterBoolSetting( false, ) +var ReplicateQueueMaxSize = settings.RegisterIntSetting( + settings.ApplicationLevel, + "kv.replicate_queue.max_size", + "controls max size of the replicate queue. Replicate queue starts dropping replicas when exceeding the max size.", + defaultQueueMaxSize, + settings.WithValidateInt(func(v int64) error { + if v < defaultQueueMaxSize { + return errors.Errorf("cannot be set to a value lower than %d: %d", defaultQueueMaxSize, v) + } + return nil + }), +) + var ( metaReplicateQueueAddReplicaCount = metric.Metadata{ Name: "queue.replicate.addreplica", @@ -759,7 +772,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica rq.baseQueue = newBaseQueue( "replicate", rq, store, queueConfig{ - maxSize: defaultQueueMaxSize, + maxSize: int(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV)), needsLease: true, needsSpanConfigs: true, acceptsUnsplitRanges: false, From 4450db265e2b9ebebf907eca0cfd9d0c68b68eda Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Thu, 28 Aug 2025 18:56:51 -0400 Subject: [PATCH 6/8] callback --- pkg/kv/kvserver/mvcc_gc_queue.go | 2 +- pkg/kv/kvserver/queue.go | 126 +++++++++++++++++----- pkg/kv/kvserver/queue_helpers_testutil.go | 2 +- pkg/kv/kvserver/replica.go | 29 +++-- pkg/kv/kvserver/replica_backpressure.go | 12 ++- pkg/kv/kvserver/store_gossip.go | 4 +- 6 files changed, 134 insertions(+), 41 deletions(-) diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index 5dbee753409c..e29b8d7c31e6 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -878,7 +878,7 @@ func (mgcq *mvccGCQueue) scanReplicasForHiPriGCHints( if !isLeaseHolder { return true } - added, _ := mgcq.addInternal(ctx, desc, replica.ReplicaID(), deleteRangePriority) + added, _ := mgcq.addInternal(ctx, desc, replica.ReplicaID(), deleteRangePriority, noopProcessCallback) if added { mgcq.store.metrics.GCEnqueueHighPriority.Inc(1) foundReplicas++ diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index c3cde1c96464..1d9f82d01465 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -112,9 +112,25 @@ type PurgatoryError interface { PurgatoryErrorMarker() // dummy method for unique interface } -// processCallback is a hook that is called when a replica finishes processing. -// It is called with the result of the process attempt. -type processCallback func(error) +var noopProcessCallback = processCallback{ + onProcessResult: func(err error) {}, + onEnqueueResult: func(indexOnHeap int, err error) {}, +} + +// processCallback is a hook that is called when a replica is enqueued or +// finishes processing. Any of the fields can be nil. +type processCallback struct { + // onProcessResult is called with the result of the process attempt. + onProcessResult func(err error) + + // onEnqueueResultis called with the result of the enqueue attempt. If error + // is nil, the index on whether this item sits at in the priority queue is + // also passed in the callback. If error is non-nil, the index passed in the + // callback in -1. Note: indexOnHeap does not represent the item's exact rank + // by priority. It only reflects the item's position in the heap array, which + // gives a rough idea of where it sits in the priority hierarchy. + onEnqueueResult func(indexOnHeap int, err error) +} // A replicaItem holds a replica and metadata about its queue state and // processing state. @@ -147,6 +163,16 @@ func (i *replicaItem) setProcessing() { // registerCallback adds a new callback to be executed when the replicaItem // finishes processing. +// +// NB: it is not a strong guarantee that the callback will be executed since +// removeLocked or removeFromReplicaSetLocked may be called without executing +// the callbacks. It may also be called multiple times +// 1. onEnqueueResult may be called when the replicaItem is dropped early due to +// exceeding max queue size +// 2. onProcessResult will be called again when baseQueue processes the replica +// in the puragtory queue TODO(wenyihu6 during review): this is bebhaviour +// change - is this too big of a backport +// TODO(wenyihu6): consider clean the semantics up after backports func (i *replicaItem) registerCallback(cb processCallback) { i.callbacks = append(i.callbacks, cb) } @@ -204,8 +230,13 @@ func (pq *priorityQueue) update(item *replicaItem, priority float64) { } var ( - errQueueDisabled = errors.New("queue disabled") - errQueueStopped = errors.New("queue stopped") + errQueueDisabled = errors.New("queue disabled") + errQueueStopped = errors.New("queue stopped") + errReplicaNotInitialized = errors.New("replica not initialized") + errReplicaAlreadyProcessing = errors.New("replica already processing") + errReplicaAlreadyInPurgatory = errors.New("replica in purgatory") + errReplicaAlreadyInQueue = errors.New("replica already in queue") + errDroppedDueToFullQueueSize = errors.New("queue full") ) func isExpectedQueueError(err error) bool { @@ -328,7 +359,8 @@ type queueConfig struct { // successes is a counter of replicas processed successfully. successes *metric.Counter // failures is a counter of replicas which failed processing. - failures *metric.Counter + failures *metric.Counter + droppedDueToQSize *metric.Counter // pending is a gauge measuring current replica count pending. pending *metric.Gauge // @@ -572,8 +604,10 @@ func (h baseQueueHelper) MaybeAdd( h.bq.maybeAdd(ctx, repl, now) } -func (h baseQueueHelper) Add(ctx context.Context, repl replicaInQueue, prio float64) { - _, err := h.bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), prio) +func (h baseQueueHelper) Add( + ctx context.Context, repl replicaInQueue, prio float64, processCallback processCallback, +) { + _, err := h.bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), prio, processCallback) if err != nil && log.V(1) { log.Dev.Infof(ctx, "during Add: %s", err) } @@ -581,9 +615,11 @@ func (h baseQueueHelper) Add(ctx context.Context, repl replicaInQueue, prio floa type queueHelper interface { MaybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) - Add(ctx context.Context, repl replicaInQueue, prio float64) + Add(ctx context.Context, repl replicaInQueue, prio float64, processCallback processCallback) } +var baseQueueAsyncRateLimited = errors.Newf("rate limited") + // Async is a more performant substitute for calling AddAsync or MaybeAddAsync // when many operations are going to be carried out. It invokes the given helper // function in a goroutine if semaphore capacity is available. If the semaphore @@ -595,7 +631,7 @@ type queueHelper interface { // (Best is to pass a constant string.) func (bq *baseQueue) Async( ctx context.Context, opName string, wait bool, fn func(ctx context.Context, h queueHelper), -) { +) error { if log.V(3) { log.Dev.InfofDepth(ctx, 2, "%s", redact.Safe(opName)) } @@ -610,12 +646,13 @@ func (bq *baseQueue) Async( if bq.addLogN.ShouldLog() { log.Dev.Infof(ctx, "rate limited in %s: %s", redact.Safe(opName), err) } - return + return baseQueueAsyncRateLimited } go func(ctx context.Context) { defer hdl.Activate(ctx).Release(ctx) fn(ctx, baseQueueHelper{bq}) }(bgCtx) + return nil } // MaybeAddAsync offers the replica to the queue. The queue will only process a @@ -624,18 +661,29 @@ func (bq *baseQueue) Async( func (bq *baseQueue) MaybeAddAsync( ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp, ) { - bq.Async(ctx, "MaybeAdd", false /* wait */, func(ctx context.Context, h queueHelper) { + _ = bq.Async(ctx, "MaybeAdd", false /* wait */, func(ctx context.Context, h queueHelper) { h.MaybeAdd(ctx, repl, now) }) } +func (bq *baseQueue) AddAsyncWithCallback( + ctx context.Context, repl replicaInQueue, prio float64, processCallback processCallback, +) { + if err := bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) { + h.Add(ctx, repl, prio, processCallback) + }); err != nil { + processCallback.onEnqueueResult(-1 /*indexOnHeap*/, err) + } +} + // AddAsync adds the replica to the queue. Unlike MaybeAddAsync, it will wait // for other operations to finish instead of turning into a noop (because // unlikely MaybeAdd, Add is not subject to being called opportunistically). func (bq *baseQueue) AddAsync(ctx context.Context, repl replicaInQueue, prio float64) { - bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) { - h.Add(ctx, repl, prio) - }) + if err := bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) { + h.Add(ctx, repl, prio, noopProcessCallback) + }); err != nil { + } } func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) { @@ -696,7 +744,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc. return } } - _, err = bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority) + _, err = bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority, noopProcessCallback) if !isExpectedQueueError(err) { log.Dev.Errorf(ctx, "unable to add: %+v", err) } @@ -706,20 +754,26 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc. // the replica is already queued at a lower priority, updates the existing // priority. Expects the queue lock to be held by caller. func (bq *baseQueue) addInternal( - ctx context.Context, desc *roachpb.RangeDescriptor, replicaID roachpb.ReplicaID, priority float64, + ctx context.Context, + desc *roachpb.RangeDescriptor, + replicaID roachpb.ReplicaID, + priority float64, + processCallback processCallback, ) (bool, error) { // NB: this is intentionally outside of bq.mu to avoid having to consider // lock ordering constraints. if !desc.IsInitialized() { // We checked this above in MaybeAdd(), but we need to check it // again for Add(). - return false, errors.New("replica not initialized") + processCallback.onEnqueueResult(-1 /*indexOnHeap*/, errReplicaNotInitialized) + return false, errReplicaNotInitialized } bq.mu.Lock() defer bq.mu.Unlock() if bq.mu.stopped { + processCallback.onEnqueueResult(-1 /*indexOnHeap*/, errQueueStopped) return false, errQueueStopped } @@ -732,12 +786,14 @@ func (bq *baseQueue) addInternal( if log.V(3) { log.Dev.Infof(ctx, "queue disabled") } + processCallback.onEnqueueResult(-1 /*indexOnHeap*/, errQueueDisabled) return false, errQueueDisabled } } // If the replica is currently in purgatory, don't re-add it. if _, ok := bq.mu.purgatory[desc.RangeID]; ok { + processCallback.onEnqueueResult(-1 /*indexOnHeap*/, errReplicaAlreadyInPurgatory) return false, nil } @@ -747,6 +803,7 @@ func (bq *baseQueue) addInternal( if item.processing { wasRequeued := item.requeue item.requeue = true + processCallback.onEnqueueResult(-1 /*indexOnHeap*/, errReplicaAlreadyProcessing) return !wasRequeued, nil } @@ -759,6 +816,7 @@ func (bq *baseQueue) addInternal( } bq.mu.priorityQ.update(item, priority) } + processCallback.onEnqueueResult(-1 /*indexOnHeap*/, errReplicaAlreadyInQueue) return false, nil } @@ -766,6 +824,7 @@ func (bq *baseQueue) addInternal( log.Dev.Infof(ctx, "adding: priority=%0.3f", priority) } item = &replicaItem{rangeID: desc.RangeID, replicaID: replicaID, priority: priority} + item.registerCallback(processCallback) bq.addLocked(item) // If adding this replica has pushed the queue past its maximum size, remove @@ -774,10 +833,13 @@ func (bq *baseQueue) addInternal( // priority element, but it would require additional bookkeeping or a linear // scan. if pqLen := bq.mu.priorityQ.Len(); pqLen > bq.maxSize { - bq.failures replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1] + bq.droppedDueToQSize.Inc(1) log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v", priority, replicaItemToDrop.replicaID) + for _, cb := range replicaItemToDrop.callbacks { + cb.onEnqueueResult(-1 /*indexOnHeap*/, errDroppedDueToFullQueueSize) + } bq.removeLocked(replicaItemToDrop) } // Signal the processLoop that a replica has been added. @@ -786,6 +848,7 @@ func (bq *baseQueue) addInternal( default: // No need to signal again. } + processCallback.onEnqueueResult(-1 /*indexOnHeap*/, nil) return true, nil } @@ -795,17 +858,16 @@ func (bq *baseQueue) addInternal( // the range is not in the queue (either waiting or processing), the method // returns false. // -// NB: If the replica this attaches to is dropped from an overfull queue, this -// callback is never called. This is surprising, but the single caller of this -// is okay with these semantics. Adding new uses is discouraged without cleaning -// up the contract of this method, but this code doesn't lend itself readily to -// upholding invariants so there may need to be some cleanup first. +// NB: Adding new uses is discouraged without cleaning up the contract of this +// method, but this code doesn't lend itself readily to upholding invariants so +// there may need to be some cleanup first. For example, +// removeFromReplicaSetLocked may be called without invoking these callbacks. func (bq *baseQueue) MaybeAddCallback(rangeID roachpb.RangeID, cb processCallback) bool { bq.mu.Lock() defer bq.mu.Unlock() if purgatoryErr, ok := bq.mu.purgatory[rangeID]; ok { - cb(purgatoryErr) + cb.onProcessResult(purgatoryErr) return true } if item, ok := bq.mu.replicas[rangeID]; ok { @@ -1177,7 +1239,7 @@ func (bq *baseQueue) finishProcessingReplica( // Call any registered callbacks. for _, cb := range callbacks { - cb(err) + cb.onProcessResult(err) } // Handle failures. @@ -1196,7 +1258,7 @@ func (bq *baseQueue) finishProcessingReplica( // purgatory. if purgErr, ok := IsPurgatoryError(err); ok { bq.mu.Lock() - bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr, priority /*priorityAtEnqueue*/) + bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr, priority /*priorityAtEnqueue*/, callbacks /*processCallback*/) bq.mu.Unlock() return } @@ -1221,6 +1283,7 @@ func (bq *baseQueue) addToPurgatoryLocked( repl replicaInQueue, purgErr PurgatoryError, priorityAtEnqueue float64, + processCallback []processCallback, ) { bq.mu.AssertHeld() @@ -1244,7 +1307,14 @@ func (bq *baseQueue) addToPurgatoryLocked( return } - item := &replicaItem{rangeID: repl.GetRangeID(), replicaID: repl.ReplicaID(), index: -1, priority: priorityAtEnqueue} + item := &replicaItem{ + rangeID: repl.GetRangeID(), + replicaID: repl.ReplicaID(), + index: -1, + priority: priorityAtEnqueue, + callbacks: processCallback, + } + bq.mu.replicas[repl.GetRangeID()] = item defer func() { diff --git a/pkg/kv/kvserver/queue_helpers_testutil.go b/pkg/kv/kvserver/queue_helpers_testutil.go index 2a5963cad56f..c0b7946fb81b 100644 --- a/pkg/kv/kvserver/queue_helpers_testutil.go +++ b/pkg/kv/kvserver/queue_helpers_testutil.go @@ -19,7 +19,7 @@ import ( func (bq *baseQueue) testingAdd( ctx context.Context, repl replicaInQueue, priority float64, ) (bool, error) { - return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority) + return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority, noopProcessCallback) } func forceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) error { diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 25de63ea1250..8983b0ce6130 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1184,25 +1184,25 @@ func (r *Replica) SetSpanConfig(conf roachpb.SpanConfig, sp roachpb.Span) bool { // impacted by changes to the SpanConfig. This should be called after any // changes to the span configs. func (r *Replica) MaybeQueue(ctx context.Context, now hlc.ClockTimestamp) { - r.store.splitQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + _ = r.store.splitQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { h.MaybeAdd(ctx, r, now) }) - r.store.mergeQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + _ = r.store.mergeQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { h.MaybeAdd(ctx, r, now) }) if EnqueueInMvccGCQueueOnSpanConfigUpdateEnabled.Get(&r.store.GetStoreConfig().Settings.SV) { - r.store.mvccGCQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + _ = r.store.mvccGCQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { h.MaybeAdd(ctx, r, now) }) } - r.store.leaseQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + _ = r.store.leaseQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { h.MaybeAdd(ctx, r, now) }) // The replicate queue has a relatively more expensive queue check // (shouldQueue), because it scales with the number of stores, and // performs more checks. if EnqueueInReplicateQueueOnSpanConfigUpdateEnabled.Get(&r.store.GetStoreConfig().Settings.SV) { - r.store.replicateQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + _ = r.store.replicateQueue.Async(ctx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { h.MaybeAdd(ctx, r, now) }) } @@ -2966,8 +2966,23 @@ func (r *Replica) maybeEnqueueProblemRange( r.store.metrics.DecommissioningNudgerEnqueue.Inc(1) // TODO(dodeca12): Figure out a better way to track the // decommissioning nudger enqueue failures/errors. - r.store.replicateQueue.AddAsync(ctx, r, - allocatorimpl.AllocatorReplaceDecommissioningVoter.Priority()) + r.store.replicateQueue.AddAsyncWithCallback(ctx, r, + allocatorimpl.AllocatorReplaceDecommissioningVoter.Priority(), processCallback{ + onProcessResult: func(err error) { + if err != nil { + // bump the metrics + log.KvDistribution.Errorf(ctx, + "decommissioning nudger failed to enqueue replica due to %v", err) + } + }, + onEnqueueResult: func(indexOnHeap int, err error) { + if err != nil { + // bump the metrics + log.KvDistribution.Errorf(ctx, + "decommissioning nudger failed to process replica due to %v", err) + } + }, + }) } // SendStreamStats sets the stats for the replica send streams that belong to diff --git a/pkg/kv/kvserver/replica_backpressure.go b/pkg/kv/kvserver/replica_backpressure.go index e185d317a74b..12467e9c8b5b 100644 --- a/pkg/kv/kvserver/replica_backpressure.go +++ b/pkg/kv/kvserver/replica_backpressure.go @@ -207,8 +207,16 @@ func (r *Replica) maybeBackpressureBatch(ctx context.Context, ba *kvpb.BatchRequ // Register a callback on an ongoing split for this range in the splitQueue. splitC := make(chan error, 1) - if !r.store.splitQueue.MaybeAddCallback(r.RangeID, func(err error) { - splitC <- err + if !r.store.splitQueue.MaybeAddCallback(r.RangeID, processCallback{ + onEnqueueResult: func(rank int, err error) {}, + onProcessResult: func(err error) { + select { + case splitC <- err: + default: + // Drop the error on the floor if there is already an error. + return + } + }, }) { // No split ongoing. We may have raced with its completion. There's // no good way to prevent this race, so we conservatively allow the diff --git a/pkg/kv/kvserver/store_gossip.go b/pkg/kv/kvserver/store_gossip.go index a284c1e71afe..ff49251b6c05 100644 --- a/pkg/kv/kvserver/store_gossip.go +++ b/pkg/kv/kvserver/store_gossip.go @@ -206,10 +206,10 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { now := s.cfg.Clock.NowAsClockTimestamp() newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { if shouldQueue { - s.splitQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) { + _ = s.splitQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) { h.MaybeAdd(ctx, repl, now) }) - s.mergeQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) { + _ = s.mergeQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) { h.MaybeAdd(ctx, repl, now) }) } From f8466e4b2e2b7cbc569ced0582581ae367d9a0f5 Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Thu, 28 Aug 2025 19:56:02 -0400 Subject: [PATCH 7/8] max size --- pkg/kv/kvserver/consistency_queue.go | 2 +- pkg/kv/kvserver/kvserverbase/BUILD.bazel | 1 + pkg/kv/kvserver/kvserverbase/base.go | 33 ++++++++++++++++++++++++ pkg/kv/kvserver/lease_queue.go | 2 +- pkg/kv/kvserver/merge_queue.go | 2 +- pkg/kv/kvserver/mvcc_gc_queue.go | 2 +- pkg/kv/kvserver/queue.go | 18 +++++++++---- pkg/kv/kvserver/raft_log_queue.go | 2 +- pkg/kv/kvserver/raft_snapshot_queue.go | 2 +- pkg/kv/kvserver/replica_gc_queue.go | 2 +- pkg/kv/kvserver/replicate_queue.go | 15 +---------- pkg/kv/kvserver/split_queue.go | 2 +- pkg/kv/kvserver/ts_maintenance_queue.go | 2 +- 13 files changed, 57 insertions(+), 28 deletions(-) diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 2b3b13624f86..48db5381a1d0 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -97,7 +97,7 @@ func newConsistencyQueue(store *Store) *consistencyQueue { q.baseQueue = newBaseQueue( "consistencyChecker", q, store, queueConfig{ - maxSize: defaultQueueMaxSize, + maxSize: kvserverbase.BaseQueueMaxSize, needsLease: true, needsSpanConfigs: false, acceptsUnsplitRanges: true, diff --git a/pkg/kv/kvserver/kvserverbase/BUILD.bazel b/pkg/kv/kvserver/kvserverbase/BUILD.bazel index f029a9f1e5b5..798c085d8242 100644 --- a/pkg/kv/kvserver/kvserverbase/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverbase/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", + "@com_github_pkg_errors//:errors", "@org_golang_x_time//rate", ], ) diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index 20f1f84d8050..81aaea229dae 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/redact" + "github.com/pkg/errors" ) // LeaseQueueEnabled is a setting that controls whether the lease queue @@ -115,6 +116,38 @@ var MVCCGCQueueEnabled = settings.RegisterBoolSetting( true, ) +var ( + // defaultQueueMaxSize is the default max size for a queue. + defaultQueueMaxSize = int64(10000) +) + +var BaseQueueMaxSize = settings.RegisterIntSetting( + settings.SystemOnly, + "kv.base_queue.max_size", + "controls max size of the base queues except for replicate queue. "+ + "Base queue starts dropping replicas when exceeding the max size. Use kv.replicate_queue.max_size for that.", + defaultQueueMaxSize, + settings.WithValidateInt(func(v int64) error { + if v < defaultQueueMaxSize { + return errors.Errorf("cannot be set to a value lower than %d: %d", defaultQueueMaxSize, v) + } + return nil + }), +) + +var ReplicateQueueMaxSize = settings.RegisterIntSetting( + settings.ApplicationLevel, + "kv.replicate_queue.max_size", + "controls max size of the replicate queue. Replicate queue starts dropping replicas when exceeding the max size.", + defaultQueueMaxSize, + settings.WithValidateInt(func(v int64) error { + if v < defaultQueueMaxSize { + return errors.Errorf("cannot be set to a value lower than %d: %d", defaultQueueMaxSize, v) + } + return nil + }), +) + var allowMMA = envutil.EnvOrDefaultBool("COCKROACH_ALLOW_MMA", false) // LoadBasedRebalancingMode controls whether range rebalancing takes diff --git a/pkg/kv/kvserver/lease_queue.go b/pkg/kv/kvserver/lease_queue.go index 70e43ca5073d..a072ebb4b58b 100644 --- a/pkg/kv/kvserver/lease_queue.go +++ b/pkg/kv/kvserver/lease_queue.go @@ -86,7 +86,7 @@ func newLeaseQueue(store *Store, allocator allocatorimpl.Allocator) *leaseQueue lq.baseQueue = newBaseQueue("lease", lq, store, queueConfig{ - maxSize: defaultQueueMaxSize, + maxSize: kvserverbase.BaseQueueMaxSize, needsLease: true, needsSpanConfigs: true, acceptsUnsplitRanges: false, diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index 62f312637e32..ae182a0ce95f 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -105,7 +105,7 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue { mq.baseQueue = newBaseQueue( "merge", mq, store, queueConfig{ - maxSize: defaultQueueMaxSize, + maxSize: kvserverbase.BaseQueueMaxSize, maxConcurrency: mergeQueueConcurrency, // TODO(ajwerner): Sometimes the merge queue needs to send multiple // snapshots, but the timeout function here is configured based on the diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index e29b8d7c31e6..dd386a7ec4b0 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -184,7 +184,7 @@ func newMVCCGCQueue(store *Store) *mvccGCQueue { mgcq.baseQueue = newBaseQueue( "mvccGC", mgcq, store, queueConfig{ - maxSize: defaultQueueMaxSize, + maxSize: kvserverbase.BaseQueueMaxSize, needsLease: true, needsSpanConfigs: true, acceptsUnsplitRanges: false, diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index 1d9f82d01465..b5cac2884b06 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -41,8 +41,6 @@ const ( // The timeout prevents a queue from getting stuck on a replica. // For example, a replica whose range is not reachable for quorum. defaultProcessTimeout = 1 * time.Minute - // defaultQueueMaxSize is the default max size for a queue. - defaultQueueMaxSize = 10000 ) // queueGuaranteedProcessingTimeBudget is the smallest amount of time before @@ -324,7 +322,7 @@ type queueProcessTimeoutFunc func(*cluster.Settings, replicaInQueue) time.Durati type queueConfig struct { // maxSize is the maximum number of replicas to queue. - maxSize int + maxSize *settings.IntSetting // maxConcurrency is the maximum number of replicas that can be processed // concurrently. If not set, defaults to 1. maxConcurrency int @@ -474,6 +472,7 @@ type baseQueue struct { purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors stopped bool disabled bool + maxSize int64 } } @@ -526,11 +525,14 @@ func newBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *b }, } bq.mu.replicas = map[roachpb.RangeID]*replicaItem{} + bq.SetMaxSize(cfg.maxSize.Get(&store.cfg.Settings.SV)) + cfg.maxSize.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) { + bq.SetMaxSize(cfg.maxSize.Get(&store.cfg.Settings.SV)) + }) bq.SetDisabled(!cfg.disabledConfig.Get(&store.cfg.Settings.SV)) cfg.disabledConfig.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) { bq.SetDisabled(!cfg.disabledConfig.Get(&store.cfg.Settings.SV)) }) - return &bq } @@ -570,6 +572,12 @@ func (bq *baseQueue) SetDisabled(disabled bool) { bq.mu.Unlock() } +func (bq *baseQueue) SetMaxSize(maxSize int64) { + bq.mu.Lock() + bq.mu.maxSize = maxSize + bq.mu.Unlock() +} + // lockProcessing locks all processing in the baseQueue. It returns // a function to unlock processing. func (bq *baseQueue) lockProcessing() func() { @@ -832,7 +840,7 @@ func (bq *baseQueue) addInternal( // guaranteed to be globally ordered. Ideally, we would remove the lowest // priority element, but it would require additional bookkeeping or a linear // scan. - if pqLen := bq.mu.priorityQ.Len(); pqLen > bq.maxSize { + if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize { replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1] bq.droppedDueToQSize.Inc(1) log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v", diff --git a/pkg/kv/kvserver/raft_log_queue.go b/pkg/kv/kvserver/raft_log_queue.go index 008358eb0a2c..55d40c74e0a0 100644 --- a/pkg/kv/kvserver/raft_log_queue.go +++ b/pkg/kv/kvserver/raft_log_queue.go @@ -167,7 +167,7 @@ func newRaftLogQueue(store *Store, db *kv.DB) *raftLogQueue { rlq.baseQueue = newBaseQueue( "raftlog", rlq, store, queueConfig{ - maxSize: defaultQueueMaxSize, + maxSize: kvserverbase.BaseQueueMaxSize, maxConcurrency: raftLogQueueConcurrency, needsLease: false, needsSpanConfigs: false, diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 643f3edf45a8..0d2ca5c9e74f 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -41,7 +41,7 @@ func newRaftSnapshotQueue(store *Store) *raftSnapshotQueue { rq.baseQueue = newBaseQueue( "raftsnapshot", rq, store, queueConfig{ - maxSize: defaultQueueMaxSize, + maxSize: kvserverbase.BaseQueueMaxSize, // The Raft leader (which sends Raft snapshots) may not be the // leaseholder. Operating on a replica without holding the lease is the // reason Raft snapshots cannot be performed by the replicateQueue. diff --git a/pkg/kv/kvserver/replica_gc_queue.go b/pkg/kv/kvserver/replica_gc_queue.go index 57df77564340..162e36063516 100644 --- a/pkg/kv/kvserver/replica_gc_queue.go +++ b/pkg/kv/kvserver/replica_gc_queue.go @@ -94,7 +94,7 @@ func newReplicaGCQueue(store *Store, db *kv.DB) *replicaGCQueue { rgcq.baseQueue = newBaseQueue( "replicaGC", rgcq, store, queueConfig{ - maxSize: defaultQueueMaxSize, + maxSize: kvserverbase.BaseQueueMaxSize, needsLease: false, needsSpanConfigs: false, acceptsUnsplitRanges: true, diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 5fc31ab090fb..cd224e0d2477 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -113,19 +113,6 @@ var PriorityInversionRequeue = settings.RegisterBoolSetting( false, ) -var ReplicateQueueMaxSize = settings.RegisterIntSetting( - settings.ApplicationLevel, - "kv.replicate_queue.max_size", - "controls max size of the replicate queue. Replicate queue starts dropping replicas when exceeding the max size.", - defaultQueueMaxSize, - settings.WithValidateInt(func(v int64) error { - if v < defaultQueueMaxSize { - return errors.Errorf("cannot be set to a value lower than %d: %d", defaultQueueMaxSize, v) - } - return nil - }), -) - var ( metaReplicateQueueAddReplicaCount = metric.Metadata{ Name: "queue.replicate.addreplica", @@ -772,7 +759,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica rq.baseQueue = newBaseQueue( "replicate", rq, store, queueConfig{ - maxSize: int(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV)), + maxSize: kvserverbase.ReplicateQueueMaxSize, needsLease: true, needsSpanConfigs: true, acceptsUnsplitRanges: false, diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index ac4675d4238e..893485359035 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -127,7 +127,7 @@ func newSplitQueue(store *Store, db *kv.DB) *splitQueue { sq.baseQueue = newBaseQueue( "split", sq, store, queueConfig{ - maxSize: defaultQueueMaxSize, + maxSize: kvserverbase.BaseQueueMaxSize, maxConcurrency: splitQueueConcurrency, needsLease: true, needsSpanConfigs: true, diff --git a/pkg/kv/kvserver/ts_maintenance_queue.go b/pkg/kv/kvserver/ts_maintenance_queue.go index de7a89fed903..e2047accee38 100644 --- a/pkg/kv/kvserver/ts_maintenance_queue.go +++ b/pkg/kv/kvserver/ts_maintenance_queue.go @@ -99,7 +99,7 @@ func newTimeSeriesMaintenanceQueue( q.baseQueue = newBaseQueue( "timeSeriesMaintenance", q, store, queueConfig{ - maxSize: defaultQueueMaxSize, + maxSize: kvserverbase.BaseQueueMaxSize, needsLease: true, needsSpanConfigs: false, acceptsUnsplitRanges: true, From 12aff875454d0b097e646be5aacc88fd802ebecd Mon Sep 17 00:00:00 2001 From: wenyihu6 Date: Thu, 28 Aug 2025 20:15:40 -0400 Subject: [PATCH 8/8] a --- pkg/kv/kvserver/metrics.go | 3 --- pkg/kv/kvserver/queue.go | 13 +++++++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 37a78e903052..9c7780de8837 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -3182,9 +3182,6 @@ type StoreMetrics struct { ReplicaGCQueueFailures *metric.Counter ReplicaGCQueuePending *metric.Gauge ReplicaGCQueueProcessingNanos *metric.Counter - ReplicateQueueEnqueueFailure *metric.Counter - ReplicateQueueEnqueueSuccess *metric.Counter - ReplicateQueueEnqueueDropped *metric.Counter ReplicateQueueSuccesses *metric.Counter ReplicateQueueFailures *metric.Counter ReplicateQueuePending *metric.Gauge diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index b5cac2884b06..aa47e425c888 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -359,6 +359,9 @@ type queueConfig struct { // failures is a counter of replicas which failed processing. failures *metric.Counter droppedDueToQSize *metric.Counter + enqueueRate *metric.Counter + enqueueFailure *metric.Counter + enqueueSuccess *metric.Counter // pending is a gauge measuring current replica count pending. pending *metric.Gauge // @@ -842,9 +845,11 @@ func (bq *baseQueue) addInternal( // scan. if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize { replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1] - bq.droppedDueToQSize.Inc(1) - log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v", - priority, replicaItemToDrop.replicaID) + if bq.droppedDueToQSize != nil { + bq.droppedDueToQSize.Inc(1)`` + } + log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v, pqLen=%v > maxSize=%v", + priority, replicaItemToDrop.replicaID, pqLen, bq.mu.maxSize) for _, cb := range replicaItemToDrop.callbacks { cb.onEnqueueResult(-1 /*indexOnHeap*/, errDroppedDueToFullQueueSize) } @@ -1409,7 +1414,7 @@ func (bq *baseQueue) processReplicasInPurgatory( if stopper.RunTask( annotatedCtx, bq.processOpName(), func(ctx context.Context) { err = bq.processReplica(ctx, repl, item.priority /*priorityAtEnqueue*/) - bq.finishProcessingReplica(ctx, stopper, repl, err) + bq.finishProcessingReplica(ctx, stopper, repl, errors.Wrap(err, "purgatory queue")) }, ) != nil { // NB: We do not need to worry about removing any unprocessed replicas