diff --git a/docs/generated/metrics/metrics.yaml b/docs/generated/metrics/metrics.yaml index 599e99e367bf..7c1437381bf5 100644 --- a/docs/generated/metrics/metrics.yaml +++ b/docs/generated/metrics/metrics.yaml @@ -13951,6 +13951,14 @@ layers: unit: COUNT aggregation: AVG derivative: NONE + - name: queue.replicate.queue_full + exported_name: queue_replicate_queue_full + description: Number of times a replica was dropped from the queue due to queue fullness + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE - name: queue.replicate.rebalancenonvoterreplica exported_name: queue_replicate_rebalancenonvoterreplica description: Number of non-voter replica rebalancer-initiated additions attempted by the replicate queue diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 9c7780de8837..4d7492d6b1ea 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -2192,6 +2192,12 @@ The messages are dropped to help these replicas to recover from I/O overload.`, Measurement: "Replicas", Unit: metric.Unit_COUNT, } + metaReplicateQueueFull = metric.Metadata{ + Name: "queue.replicate.queue_full", + Help: "Number of times a replica was dropped from the queue due to queue fullness", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } metaReplicateQueueProcessingNanos = metric.Metadata{ Name: "queue.replicate.processingnanos", Help: "Nanoseconds spent processing replicas in the replicate queue", @@ -3185,6 +3191,7 @@ type StoreMetrics struct { ReplicateQueueSuccesses *metric.Counter ReplicateQueueFailures *metric.Counter ReplicateQueuePending *metric.Gauge + ReplicateQueueFull *metric.Counter ReplicateQueueProcessingNanos *metric.Counter ReplicateQueuePurgatory *metric.Gauge SplitQueueSuccesses *metric.Counter @@ -3974,6 +3981,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { ReplicateQueueSuccesses: metric.NewCounter(metaReplicateQueueSuccesses), ReplicateQueueFailures: metric.NewCounter(metaReplicateQueueFailures), ReplicateQueuePending: metric.NewGauge(metaReplicateQueuePending), + ReplicateQueueFull: metric.NewCounter(metaReplicateQueueFull), ReplicateQueueProcessingNanos: metric.NewCounter(metaReplicateQueueProcessingNanos), ReplicateQueuePurgatory: metric.NewGauge(metaReplicateQueuePurgatory), SplitQueueSuccesses: metric.NewCounter(metaSplitQueueSuccesses), diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index 066f4cb39d4a..cb7e94f4c0c0 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -331,6 +331,11 @@ type queueConfig struct { failures *metric.Counter // pending is a gauge measuring current replica count pending. pending *metric.Gauge + // full is a counter measuring replicas dropped due to exceeding the queue max + // size. + // NB: this metric may be nil for queues that are not interested in tracking + // this. + full *metric.Counter // processingNanos is a counter measuring total nanoseconds spent processing // replicas. processingNanos *metric.Counter @@ -441,6 +446,7 @@ type baseQueue struct { purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors stopped bool disabled bool + maxSize int64 } } @@ -493,6 +499,7 @@ func newBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *b }, } bq.mu.replicas = map[roachpb.RangeID]*replicaItem{} + bq.mu.maxSize = int64(cfg.maxSize) bq.SetDisabled(!cfg.disabledConfig.Get(&store.cfg.Settings.SV)) cfg.disabledConfig.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) { bq.SetDisabled(!cfg.disabledConfig.Get(&store.cfg.Settings.SV)) @@ -537,6 +544,23 @@ func (bq *baseQueue) SetDisabled(disabled bool) { bq.mu.Unlock() } +// SetMaxSize sets the max size of the queue. +func (bq *baseQueue) SetMaxSize(maxSize int64) { + bq.mu.Lock() + defer bq.mu.Unlock() + bq.mu.maxSize = maxSize + // Drop replicas until no longer exceeding the max size. Note: We call + // removeLocked to match the behavior of addInternal. In theory, only + // removeFromQueueLocked should be triggered in removeLocked, since the item + // is in the priority queue, it should not be processing or in the purgatory + // queue. To be safe, however, we use removeLocked. + for int64(bq.mu.priorityQ.Len()) > maxSize { + pqLen := bq.mu.priorityQ.Len() + bq.full.Inc(1) + bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1]) + } +} + // lockProcessing locks all processing in the baseQueue. It returns // a function to unlock processing. func (bq *baseQueue) lockProcessing() func() { @@ -772,8 +796,11 @@ func (bq *baseQueue) addInternal( // guaranteed to be globally ordered. Ideally, we would remove the lowest // priority element, but it would require additional bookkeeping or a linear // scan. - if pqLen := bq.mu.priorityQ.Len(); pqLen > bq.maxSize { + if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize { replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1] + if bq.full != nil { + bq.full.Inc(1) + } log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v", priority, replicaItemToDrop.replicaID) bq.removeLocked(replicaItemToDrop) diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 0de70b4d3d46..95e4bfb6c6f8 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -100,6 +100,24 @@ var EnqueueProblemRangeInReplicateQueueInterval = settings.RegisterDurationSetti 0, ) +// ReplicateQueueMaxSize is a setting that controls the max size of the +// replicate queue. When this limit is exceeded, lower priority replicas (not +// guaranteed to be the lowest) are dropped from the queue. +var ReplicateQueueMaxSize = settings.RegisterIntSetting( + settings.ApplicationLevel, + "kv.replicate_queue.max_size", + "maximum number of replicas that can be queued for replicate queue processing; "+ + "when this limit is exceeded, lower priority (not guaranteed to be the lowest) "+ + "replicas are dropped from the queue", + defaultQueueMaxSize, + settings.WithValidateInt(func(v int64) error { + if v < defaultQueueMaxSize { + return errors.Errorf("cannot be set to a value lower than %d: %d", defaultQueueMaxSize, v) + } + return nil + }), +) + var ( metaReplicateQueueAddReplicaCount = metric.Metadata{ Name: "queue.replicate.addreplica", @@ -524,6 +542,7 @@ func (metrics *ReplicateQueueMetrics) trackResultByAllocatorAction( // additional replica to their range. type replicateQueue struct { *baseQueue + maxSize *settings.IntSetting metrics ReplicateQueueMetrics allocator allocatorimpl.Allocator as *mmaintegration.AllocatorSync @@ -549,6 +568,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica storePool = store.cfg.StorePool } rq := &replicateQueue{ + maxSize: ReplicateQueueMaxSize, metrics: makeReplicateQueueMetrics(), planner: plan.NewReplicaPlanner(allocator, storePool, store.TestingKnobs().ReplicaPlannerKnobs), @@ -578,11 +598,16 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica successes: store.metrics.ReplicateQueueSuccesses, failures: store.metrics.ReplicateQueueFailures, pending: store.metrics.ReplicateQueuePending, + full: store.metrics.ReplicateQueueFull, processingNanos: store.metrics.ReplicateQueueProcessingNanos, purgatory: store.metrics.ReplicateQueuePurgatory, disabledConfig: kvserverbase.ReplicateQueueEnabled, }, ) + rq.baseQueue.SetMaxSize(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV)) + ReplicateQueueMaxSize.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) { + rq.baseQueue.SetMaxSize(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV)) + }) updateFn := func() { select { case rq.updateCh <- timeutil.Now(): diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index 268bb4da8475..9313f3e8560a 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -1864,3 +1864,67 @@ func TestingRaftStatusFn(desc *roachpb.RangeDescriptor, storeID roachpb.StoreID) } return status } + +// TestReplicateQueueMaxSize tests the max size of the replicate queue and +// verifies that replicas are dropped when the max size is exceeded. It also +// checks that the metric ReplicateQueueDroppedDueToSize is updated correctly. +func TestReplicateQueueMaxSize(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tc := testContext{} + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + tc.Start(ctx, t, stopper) + + r, err := tc.store.GetReplica(1) + require.NoError(t, err) + + stopper, _, _, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, true /* deterministic */) + defer stopper.Stop(ctx) + ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 1) + replicateQueue := newReplicateQueue(tc.store, a) + + // Helper function to add a replica and verify queue state. + verify := func(expectedLength int, expectedDropped int64) { + require.Equal(t, expectedLength, replicateQueue.Length()) + require.Equal(t, expectedDropped, replicateQueue.full.Count()) + require.Equal(t, expectedDropped, tc.store.metrics.ReplicateQueueFull.Count()) + } + + addReplicaAndVerify := func(rangeID roachpb.RangeID, expectedLength int, expectedDropped int64) { + r.Desc().RangeID = rangeID + enqueued, err := replicateQueue.testingAdd(context.Background(), r, 0.0) + require.NoError(t, err) + require.True(t, enqueued) + verify(expectedLength, expectedDropped) + } + + // First replica should be added. + addReplicaAndVerify(1 /* rangeID */, 1 /* expectedLength */, 0 /* expectedDropped */) + // Second replica should be dropped. + addReplicaAndVerify(2 /* rangeID */, 1 /* expectedLength */, 1 /* expectedDropped */) + // Third replica should be dropped. + addReplicaAndVerify(3 /* rangeID */, 1 /* expectedLength */, 2 /* expectedDropped */) + + // Increase the max size to 100 and add more replicas + ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 100) + for i := 2; i <= 100; i++ { + // Should be added. + addReplicaAndVerify(roachpb.RangeID(i+1 /* rangeID */), i /* expectedLength */, 2 /* expectedDropped */) + } + + // Add one more to exceed the max size. Should be dropped. + addReplicaAndVerify(102 /* rangeID */, 100 /* expectedLength */, 3 /* expectedDropped */) + + // Reset to the same size should not change the queue length. + ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 100) + verify(100 /* expectedLength */, 3 /* expectedDropped */) + + // Decrease the max size to 10 which should drop 90 replicas. + ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 10) + verify(10 /* expectedLength */, 93 /* expectedDropped: 3 + 90 */) + + // Should drop another one now that max size is 10. + addReplicaAndVerify(103 /* rangeID */, 10 /* expectedLength */, 94 /* expectedDropped: 3 + 90 + 1 */) +}