Skip to content
8 changes: 8 additions & 0 deletions docs/generated/metrics/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13951,6 +13951,14 @@ layers:
unit: COUNT
aggregation: AVG
derivative: NONE
- name: queue.replicate.queue_full
exported_name: queue_replicate_queue_full
description: Number of times a replica was dropped from the queue due to queue fullness
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.rebalancenonvoterreplica
exported_name: queue_replicate_rebalancenonvoterreplica
description: Number of non-voter replica rebalancer-initiated additions attempted by the replicate queue
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2192,6 +2192,12 @@ The messages are dropped to help these replicas to recover from I/O overload.`,
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueFull = metric.Metadata{
Name: "queue.replicate.queue_full",
Help: "Number of times a replica was dropped from the queue due to queue fullness",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueProcessingNanos = metric.Metadata{
Name: "queue.replicate.processingnanos",
Help: "Nanoseconds spent processing replicas in the replicate queue",
Expand Down Expand Up @@ -3185,6 +3191,7 @@ type StoreMetrics struct {
ReplicateQueueSuccesses *metric.Counter
ReplicateQueueFailures *metric.Counter
ReplicateQueuePending *metric.Gauge
ReplicateQueueFull *metric.Counter
ReplicateQueueProcessingNanos *metric.Counter
ReplicateQueuePurgatory *metric.Gauge
SplitQueueSuccesses *metric.Counter
Expand Down Expand Up @@ -3974,6 +3981,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
ReplicateQueueSuccesses: metric.NewCounter(metaReplicateQueueSuccesses),
ReplicateQueueFailures: metric.NewCounter(metaReplicateQueueFailures),
ReplicateQueuePending: metric.NewGauge(metaReplicateQueuePending),
ReplicateQueueFull: metric.NewCounter(metaReplicateQueueFull),
ReplicateQueueProcessingNanos: metric.NewCounter(metaReplicateQueueProcessingNanos),
ReplicateQueuePurgatory: metric.NewGauge(metaReplicateQueuePurgatory),
SplitQueueSuccesses: metric.NewCounter(metaSplitQueueSuccesses),
Expand Down
29 changes: 28 additions & 1 deletion pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ type queueConfig struct {
failures *metric.Counter
// pending is a gauge measuring current replica count pending.
pending *metric.Gauge
// full is a counter measuring replicas dropped due to exceeding the queue max
// size.
// NB: this metric may be nil for queues that are not interested in tracking
// this.
full *metric.Counter
// processingNanos is a counter measuring total nanoseconds spent processing
// replicas.
processingNanos *metric.Counter
Expand Down Expand Up @@ -441,6 +446,7 @@ type baseQueue struct {
purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors
stopped bool
disabled bool
maxSize int64
}
}

Expand Down Expand Up @@ -493,6 +499,7 @@ func newBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *b
},
}
bq.mu.replicas = map[roachpb.RangeID]*replicaItem{}
bq.mu.maxSize = int64(cfg.maxSize)
bq.SetDisabled(!cfg.disabledConfig.Get(&store.cfg.Settings.SV))
cfg.disabledConfig.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) {
bq.SetDisabled(!cfg.disabledConfig.Get(&store.cfg.Settings.SV))
Expand Down Expand Up @@ -537,6 +544,23 @@ func (bq *baseQueue) SetDisabled(disabled bool) {
bq.mu.Unlock()
}

// SetMaxSize sets the max size of the queue.
func (bq *baseQueue) SetMaxSize(maxSize int64) {
bq.mu.Lock()
defer bq.mu.Unlock()
bq.mu.maxSize = maxSize
// Drop replicas until no longer exceeding the max size. Note: We call
// removeLocked to match the behavior of addInternal. In theory, only
// removeFromQueueLocked should be triggered in removeLocked, since the item
// is in the priority queue, it should not be processing or in the purgatory
// queue. To be safe, however, we use removeLocked.
for int64(bq.mu.priorityQ.Len()) > maxSize {
pqLen := bq.mu.priorityQ.Len()
bq.full.Inc(1)
bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1])
}
}

// lockProcessing locks all processing in the baseQueue. It returns
// a function to unlock processing.
func (bq *baseQueue) lockProcessing() func() {
Expand Down Expand Up @@ -772,8 +796,11 @@ func (bq *baseQueue) addInternal(
// guaranteed to be globally ordered. Ideally, we would remove the lowest
// priority element, but it would require additional bookkeeping or a linear
// scan.
if pqLen := bq.mu.priorityQ.Len(); pqLen > bq.maxSize {
if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize {
replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1]
if bq.full != nil {
bq.full.Inc(1)
}
log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v",
priority, replicaItemToDrop.replicaID)
bq.removeLocked(replicaItemToDrop)
Expand Down
25 changes: 25 additions & 0 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ var EnqueueProblemRangeInReplicateQueueInterval = settings.RegisterDurationSetti
0,
)

// ReplicateQueueMaxSize is a setting that controls the max size of the
// replicate queue. When this limit is exceeded, lower priority replicas (not
// guaranteed to be the lowest) are dropped from the queue.
var ReplicateQueueMaxSize = settings.RegisterIntSetting(
settings.ApplicationLevel,
"kv.replicate_queue.max_size",
"maximum number of replicas that can be queued for replicate queue processing; "+
"when this limit is exceeded, lower priority (not guaranteed to be the lowest) "+
"replicas are dropped from the queue",
defaultQueueMaxSize,
settings.WithValidateInt(func(v int64) error {
if v < defaultQueueMaxSize {
return errors.Errorf("cannot be set to a value lower than %d: %d", defaultQueueMaxSize, v)
}
return nil
}),
)

var (
metaReplicateQueueAddReplicaCount = metric.Metadata{
Name: "queue.replicate.addreplica",
Expand Down Expand Up @@ -524,6 +542,7 @@ func (metrics *ReplicateQueueMetrics) trackResultByAllocatorAction(
// additional replica to their range.
type replicateQueue struct {
*baseQueue
maxSize *settings.IntSetting
metrics ReplicateQueueMetrics
allocator allocatorimpl.Allocator
as *mmaintegration.AllocatorSync
Expand All @@ -549,6 +568,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
storePool = store.cfg.StorePool
}
rq := &replicateQueue{
maxSize: ReplicateQueueMaxSize,
metrics: makeReplicateQueueMetrics(),
planner: plan.NewReplicaPlanner(allocator, storePool,
store.TestingKnobs().ReplicaPlannerKnobs),
Expand Down Expand Up @@ -578,11 +598,16 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
successes: store.metrics.ReplicateQueueSuccesses,
failures: store.metrics.ReplicateQueueFailures,
pending: store.metrics.ReplicateQueuePending,
full: store.metrics.ReplicateQueueFull,
processingNanos: store.metrics.ReplicateQueueProcessingNanos,
purgatory: store.metrics.ReplicateQueuePurgatory,
disabledConfig: kvserverbase.ReplicateQueueEnabled,
},
)
rq.baseQueue.SetMaxSize(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV))
ReplicateQueueMaxSize.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) {
rq.baseQueue.SetMaxSize(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV))
})
updateFn := func() {
select {
case rq.updateCh <- timeutil.Now():
Expand Down
64 changes: 64 additions & 0 deletions pkg/kv/kvserver/store_rebalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1864,3 +1864,67 @@ func TestingRaftStatusFn(desc *roachpb.RangeDescriptor, storeID roachpb.StoreID)
}
return status
}

// TestReplicateQueueMaxSize tests the max size of the replicate queue and
// verifies that replicas are dropped when the max size is exceeded. It also
// checks that the metric ReplicateQueueDroppedDueToSize is updated correctly.
func TestReplicateQueueMaxSize(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tc := testContext{}
stopper := stop.NewStopper()
ctx := context.Background()
defer stopper.Stop(ctx)
tc.Start(ctx, t, stopper)

r, err := tc.store.GetReplica(1)
require.NoError(t, err)

stopper, _, _, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, true /* deterministic */)
defer stopper.Stop(ctx)
ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 1)
replicateQueue := newReplicateQueue(tc.store, a)

// Helper function to add a replica and verify queue state.
verify := func(expectedLength int, expectedDropped int64) {
require.Equal(t, expectedLength, replicateQueue.Length())
require.Equal(t, expectedDropped, replicateQueue.full.Count())
require.Equal(t, expectedDropped, tc.store.metrics.ReplicateQueueFull.Count())
}

addReplicaAndVerify := func(rangeID roachpb.RangeID, expectedLength int, expectedDropped int64) {
r.Desc().RangeID = rangeID
enqueued, err := replicateQueue.testingAdd(context.Background(), r, 0.0)
require.NoError(t, err)
require.True(t, enqueued)
verify(expectedLength, expectedDropped)
}

// First replica should be added.
addReplicaAndVerify(1 /* rangeID */, 1 /* expectedLength */, 0 /* expectedDropped */)
// Second replica should be dropped.
addReplicaAndVerify(2 /* rangeID */, 1 /* expectedLength */, 1 /* expectedDropped */)
// Third replica should be dropped.
addReplicaAndVerify(3 /* rangeID */, 1 /* expectedLength */, 2 /* expectedDropped */)

// Increase the max size to 100 and add more replicas
ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 100)
for i := 2; i <= 100; i++ {
// Should be added.
addReplicaAndVerify(roachpb.RangeID(i+1 /* rangeID */), i /* expectedLength */, 2 /* expectedDropped */)
}

// Add one more to exceed the max size. Should be dropped.
addReplicaAndVerify(102 /* rangeID */, 100 /* expectedLength */, 3 /* expectedDropped */)

// Reset to the same size should not change the queue length.
ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 100)
verify(100 /* expectedLength */, 3 /* expectedDropped */)

// Decrease the max size to 10 which should drop 90 replicas.
ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 10)
verify(10 /* expectedLength */, 93 /* expectedDropped: 3 + 90 */)

// Should drop another one now that max size is 10.
addReplicaAndVerify(103 /* rangeID */, 10 /* expectedLength */, 94 /* expectedDropped: 3 + 90 + 1 */)
}