diff --git a/docs/generated/metrics/metrics.yaml b/docs/generated/metrics/metrics.yaml index 7c1437381bf5..fd28442881ff 100644 --- a/docs/generated/metrics/metrics.yaml +++ b/docs/generated/metrics/metrics.yaml @@ -13903,6 +13903,38 @@ layers: unit: COUNT aggregation: AVG derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.enqueue.add + exported_name: queue_replicate_enqueue_add + description: Number of replicas successfully added to the replicate queue + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.enqueue.failedprecondition + exported_name: queue_replicate_enqueue_failedprecondition + description: Number of replicas that failed the precondition checks and were therefore not added to the replicate queue + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.enqueue.noaction + exported_name: queue_replicate_enqueue_noaction + description: Number of replicas for which ShouldQueue determined no action was needed and were therefore not added to the replicate queue + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: queue.replicate.enqueue.unexpectederror + exported_name: queue_replicate_enqueue_unexpectederror + description: Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to add to the replicate queue directly), but failed to be enqueued due to unexpected errors + y_axis_label: Replicas + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE - name: queue.replicate.nonvoterpromotions exported_name: queue_replicate_nonvoterpromotions description: Number of non-voters promoted to voters by the replicate queue @@ -15342,10 +15374,46 @@ layers: unit: COUNT aggregation: AVG derivative: NON_NEGATIVE_DERIVATIVE + - name: ranges.decommissioning.nudger.enqueue.failure + exported_name: ranges_decommissioning_nudger_enqueue_failure + labeled_name: ranges.decommissioning.nudger.enqueue.failure + description: Number of ranges that failed to enqueue at the replicate queue + y_axis_label: Ranges + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: ranges.decommissioning.nudger.enqueue.success + exported_name: ranges_decommissioning_nudger_enqueue_success + labeled_name: ranges.decommissioning.nudger.enqueue.success + description: Number of ranges that were successfully enqueued by the decommisioning nudger + y_axis_label: Ranges + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE - name: ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease exported_name: ranges_decommissioning_nudger_not_leaseholder_or_invalid_lease labeled_name: ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease - description: Number of enqueues of a range for decommissioning by the decommissioning nudger that were not the leaseholder or had an invalid lease + description: Number of ranges that were not the leaseholder or had an invalid lease at the decommissioning nudger + y_axis_label: Ranges + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: ranges.decommissioning.nudger.process.failure + exported_name: ranges_decommissioning_nudger_process_failure + labeled_name: ranges.decommissioning.nudger.process.failure + description: Number of ranges enqueued by the decommissioning nudger that failed to process by the replicate queue + y_axis_label: Ranges + type: COUNTER + unit: COUNT + aggregation: AVG + derivative: NON_NEGATIVE_DERIVATIVE + - name: ranges.decommissioning.nudger.process.success + exported_name: ranges_decommissioning_nudger_process_success + labeled_name: ranges.decommissioning.nudger.process.success + description: Number of ranges enqueued by the decommissioning nudger that were successfully processed by the replicate queue y_axis_label: Ranges type: COUNTER unit: COUNT diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 4d7492d6b1ea..f03e6083bca3 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -180,9 +180,37 @@ var ( LabeledName: "ranges.decommissioning.nudger.enqueue", StaticLabels: metric.MakeLabelPairs(metric.LabelStatus, "enqueue"), } + metaDecommissioningNudgerEnqueueSuccess = metric.Metadata{ + Name: "ranges.decommissioning.nudger.enqueue.success", + Help: "Number of ranges that were successfully enqueued by the decommisioning nudger", + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + LabeledName: "ranges.decommissioning.nudger.enqueue.success", + } + metaDecommissioningNudgerEnqueueFailure = metric.Metadata{ + Name: "ranges.decommissioning.nudger.enqueue.failure", + Help: "Number of ranges that failed to enqueue at the replicate queue", + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + LabeledName: "ranges.decommissioning.nudger.enqueue.failure", + } + metaDecommissioningNudgerProcessSuccess = metric.Metadata{ + Name: "ranges.decommissioning.nudger.process.success", + Help: "Number of ranges enqueued by the decommissioning nudger that were successfully processed by the replicate queue", + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + LabeledName: "ranges.decommissioning.nudger.process.success", + } + metaDecommissioningNudgerProcessFailure = metric.Metadata{ + Name: "ranges.decommissioning.nudger.process.failure", + Help: "Number of ranges enqueued by the decommissioning nudger that failed to process by the replicate queue", + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + LabeledName: "ranges.decommissioning.nudger.process.failure", + } metaDecommissioningNudgerNotLeaseholderOrInvalidLease = metric.Metadata{ Name: "ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease", - Help: "Number of enqueues of a range for decommissioning by the decommissioning nudger that were not the leaseholder or had an invalid lease", + Help: "Number of ranges that were not the leaseholder or had an invalid lease at the decommissioning nudger", Measurement: "Ranges", Unit: metric.Unit_COUNT, LabeledName: "ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease", @@ -2144,6 +2172,33 @@ The messages are dropped to help these replicas to recover from I/O overload.`, Measurement: "Processing Time", Unit: metric.Unit_NANOSECONDS, } + metaReplicateQueueEnqueueAdd = metric.Metadata{ + Name: "queue.replicate.enqueue.add", + Help: "Number of replicas successfully added to the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueEnqueueFailedPrecondition = metric.Metadata{ + Name: "queue.replicate.enqueue.failedprecondition", + Help: "Number of replicas that failed the precondition checks and were therefore not added to the replicate " + + "queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueEnqueueNoAction = metric.Metadata{ + Name: "queue.replicate.enqueue.noaction", + Help: "Number of replicas for which ShouldQueue determined no action was needed and were therefore not " + + "added to the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueEnqueueUnexpectedError = metric.Metadata{ + Name: "queue.replicate.enqueue.unexpectederror", + Help: "Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to " + + "add to the replicate queue directly), but failed to be enqueued due to unexpected errors", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } metaLeaseQueueSuccesses = metric.Metadata{ Name: "queue.lease.process.success", Help: "Number of replicas successfully processed by the replica lease queue", @@ -2892,6 +2947,10 @@ type StoreMetrics struct { // Decommissioning nudger metrics. DecommissioningNudgerEnqueue *metric.Counter + DecommissioningNudgerEnqueueSuccess *metric.Counter + DecommissioningNudgerEnqueueFailure *metric.Counter + DecommissioningNudgerProcessSuccess *metric.Counter + DecommissioningNudgerProcessFailure *metric.Counter DecommissioningNudgerNotLeaseholderOrInvalidLease *metric.Counter // Lease request metrics for successful and failed lease requests. These @@ -3188,6 +3247,10 @@ type StoreMetrics struct { ReplicaGCQueueFailures *metric.Counter ReplicaGCQueuePending *metric.Gauge ReplicaGCQueueProcessingNanos *metric.Counter + ReplicateQueueEnqueueAdd *metric.Counter + ReplicateQueueEnqueueFailedPrecondition *metric.Counter + ReplicateQueueEnqueueNoAction *metric.Counter + ReplicateQueueEnqueueUnexpectedError *metric.Counter ReplicateQueueSuccesses *metric.Counter ReplicateQueueFailures *metric.Counter ReplicateQueuePending *metric.Gauge @@ -3616,6 +3679,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { // Decommissioning nuder metrics. DecommissioningNudgerEnqueue: metric.NewCounter(metaDecommissioningNudgerEnqueue), + DecommissioningNudgerEnqueueSuccess: metric.NewCounter(metaDecommissioningNudgerEnqueueSuccess), + DecommissioningNudgerEnqueueFailure: metric.NewCounter(metaDecommissioningNudgerEnqueueFailure), + DecommissioningNudgerProcessSuccess: metric.NewCounter(metaDecommissioningNudgerProcessSuccess), + DecommissioningNudgerProcessFailure: metric.NewCounter(metaDecommissioningNudgerProcessFailure), DecommissioningNudgerNotLeaseholderOrInvalidLease: metric.NewCounter(metaDecommissioningNudgerNotLeaseholderOrInvalidLease), // Lease request metrics. @@ -3978,6 +4045,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { ReplicaGCQueueFailures: metric.NewCounter(metaReplicaGCQueueFailures), ReplicaGCQueuePending: metric.NewGauge(metaReplicaGCQueuePending), ReplicaGCQueueProcessingNanos: metric.NewCounter(metaReplicaGCQueueProcessingNanos), + ReplicateQueueEnqueueAdd: metric.NewCounter(metaReplicateQueueEnqueueAdd), + ReplicateQueueEnqueueFailedPrecondition: metric.NewCounter(metaReplicateQueueEnqueueFailedPrecondition), + ReplicateQueueEnqueueNoAction: metric.NewCounter(metaReplicateQueueEnqueueNoAction), + ReplicateQueueEnqueueUnexpectedError: metric.NewCounter(metaReplicateQueueEnqueueUnexpectedError), ReplicateQueueSuccesses: metric.NewCounter(metaReplicateQueueSuccesses), ReplicateQueueFailures: metric.NewCounter(metaReplicateQueueFailures), ReplicateQueuePending: metric.NewGauge(metaReplicateQueuePending), diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index af261dac4ead..84f49f9f67cf 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -388,6 +388,20 @@ type queueConfig struct { processDestroyedReplicas bool // processTimeout returns the timeout for processing a replica. processTimeoutFunc queueProcessTimeoutFunc + // enqueueAdd is a counter of replicas that were successfully added to the + // queue. + enqueueAdd *metric.Counter + // enqueueFailedPrecondition is a counter of replicas that failed the + // precondition checks and were therefore not added to the queue. + enqueueFailedPrecondition *metric.Counter + // enqueueNoAction is a counter of replicas that had ShouldQueue determine no + // action was needed and were therefore not added to the queue. + enqueueNoAction *metric.Counter + // enqueueUnexpectedError is a counter of replicas that were expected to be + // enqueued (either had ShouldQueue return true or the caller explicitly + // requested to be added to the queue directly), but failed to be enqueued + // during the enqueue process (such as Async was rated limited). + enqueueUnexpectedError *metric.Counter // successes is a counter of replicas processed successfully. successes *metric.Counter // failures is a counter of replicas which failed processing. @@ -733,6 +747,7 @@ func (bq *baseQueue) AddAsyncWithCallback( h.Add(ctx, repl, prio, cb) }); err != nil { cb.onEnqueueResult(-1 /*indexOnHeap*/, err) + bq.updateMetricsOnEnqueueUnexpectedError() } } @@ -740,9 +755,54 @@ func (bq *baseQueue) AddAsyncWithCallback( // for other operations to finish instead of turning into a noop (because // unlikely MaybeAdd, Add is not subject to being called opportunistically). func (bq *baseQueue) AddAsync(ctx context.Context, repl replicaInQueue, prio float64) { - _ = bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) { + if err := bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) { h.Add(ctx, repl, prio, noopProcessCallback) - }) + }); err != nil { + // We don't update metrics in MaybeAddAsync because we don't know if the + // replica should be queued at this point. We only count it as an unexpected + // error when we're certain the replica should be enqueued. In this case, + // the caller explicitly wants to add the replica to the queue directly, so + // we do update the metrics on unexpected error. + bq.updateMetricsOnEnqueueUnexpectedError() + } +} + +// updateMetricsOnEnqueueFailedPrecondition updates the metrics when a replica +// fails precondition checks (replicaCanBeProcessed) and should not be +// considered for enqueueing. This may include cases where the replica does not +// have a valid lease, is uninitialized, is destroyed, failed to retrieve span +// conf reader, or unsplit ranges. +func (bq *baseQueue) updateMetricsOnEnqueueFailedPrecondition() { + if bq.enqueueFailedPrecondition != nil { + bq.enqueueFailedPrecondition.Inc(1) + } +} + +// updateMetricsOnEnqueueNoAction updates the metrics when shouldQueue +// determines no action is needed and the replica is not added to the queue. +func (bq *baseQueue) updateMetricsOnEnqueueNoAction() { + if bq.enqueueNoAction != nil { + bq.enqueueNoAction.Inc(1) + } +} + +// updateMetricsOnEnqueueUnexpectedError updates the metrics when an unexpected +// error occurs during enqueue operations. This should be called for replicas +// that were expected to be enqueued (either had ShouldQueue return true or the +// caller explicitly requested to be added to the queue directly), but failed to +// be enqueued during the enqueue process (such as Async was rated limited). +func (bq *baseQueue) updateMetricsOnEnqueueUnexpectedError() { + if bq.enqueueUnexpectedError != nil { + bq.enqueueUnexpectedError.Inc(1) + } +} + +// updateMetricsOnEnqueueAdd updates the metrics when a replica is successfully +// added to the queue. +func (bq *baseQueue) updateMetricsOnEnqueueAdd() { + if bq.enqueueAdd != nil { + bq.enqueueAdd.Inc(1) + } } func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) { @@ -779,6 +839,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc. // Load the system config if it's needed. confReader, err := bq.replicaCanBeProcessed(ctx, repl, false /* acquireLeaseIfNeeded */) if err != nil { + bq.updateMetricsOnEnqueueFailedPrecondition() return } @@ -788,6 +849,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc. realRepl, _ := repl.(*Replica) should, priority := bq.impl.shouldQueue(ctx, now, realRepl, confReader) if !should { + bq.updateMetricsOnEnqueueNoAction() return } @@ -795,10 +857,12 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc. if extConf != nil && extConf.Get(&bq.store.cfg.Settings.SV) { hasExternal, err := realRepl.HasExternalBytes() if err != nil { + bq.updateMetricsOnEnqueueUnexpectedError() log.Dev.Warningf(ctx, "could not determine if %s has external bytes: %s", realRepl, err) return } if hasExternal { + bq.updateMetricsOnEnqueueUnexpectedError() log.Dev.VInfof(ctx, 1, "skipping %s for %s because it has external bytes", bq.name, realRepl) return } @@ -841,8 +905,10 @@ func (bq *baseQueue) addInternal( cb processCallback, ) (added bool, err error) { defer func() { + // INVARIANT: added => err == nil. if err != nil { cb.onEnqueueResult(-1 /* indexOnHeap */, err) + bq.updateMetricsOnEnqueueUnexpectedError() } }() // NB: this is intentionally outside of bq.mu to avoid having to consider @@ -935,6 +1001,10 @@ func (bq *baseQueue) addInternal( default: // No need to signal again. } + // Note that we are bumping enqueueAdd here instead of during defer to avoid + // treating requeuing a processing replica as newly added. They will be + // re-added to the queue later which will double count them. + bq.updateMetricsOnEnqueueAdd() // Note: it may already be dropped or dropped afterwards. cb.onEnqueueResult(item.index /*indexOnHeap*/, nil) return true, nil diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index 86ee121d45c2..f78432a3ee15 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -111,6 +111,8 @@ func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfi cfg.pending = metric.NewGauge(metric.Metadata{Name: "pending"}) cfg.processingNanos = metric.NewCounter(metric.Metadata{Name: "processingnanos"}) cfg.purgatory = metric.NewGauge(metric.Metadata{Name: "purgatory"}) + cfg.enqueueAdd = metric.NewCounter(metric.Metadata{Name: "enqueueadd"}) + cfg.enqueueUnexpectedError = metric.NewCounter(metric.Metadata{Name: "enqueueunexpectederror"}) cfg.disabledConfig = testQueueEnabled return newBaseQueue(name, impl, store, cfg) } @@ -1321,6 +1323,7 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) { t.Fatal("unexpected call to onProcessResult") }, }) + require.Equal(t, bq.enqueueAdd.Count(), int64(1)) require.True(t, queued) }) @@ -1347,6 +1350,7 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) { t.Fatal("unexpected call to onProcessResult") }, }) + require.Equal(t, int64(i+1), bq.enqueueAdd.Count()) require.True(t, queued) } // Set range id back to 1. @@ -1367,6 +1371,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) { t.Fatal("unexpected call to onProcessResult") }, }) + require.Equal(t, int64(0), bq.enqueueAdd.Count()) + require.Equal(t, int64(1), bq.enqueueUnexpectedError.Count()) require.False(t, queued) }) t.Run("stopped", func(t *testing.T) { @@ -1385,6 +1391,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) { }, }) require.False(t, queued) + require.Equal(t, int64(0), bq.enqueueAdd.Count()) + require.Equal(t, int64(1), bq.enqueueUnexpectedError.Count()) }) t.Run("alreadyqueued", func(t *testing.T) { @@ -1402,6 +1410,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) { }, }) require.True(t, queued) + require.Equal(t, int64(1), bq.enqueueAdd.Count()) + require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count()) // Inserting again on the same range id should fail. queued, _ = bq.testingAddWithCallback(ctx, r, 1.0, processCallback{ @@ -1414,6 +1424,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) { }, }) require.False(t, queued) + require.Equal(t, int64(1), bq.enqueueAdd.Count()) + require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count()) }) t.Run("purgatory", func(t *testing.T) { @@ -1437,6 +1449,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) { }, }) require.False(t, queued) + require.Equal(t, int64(0), bq.enqueueAdd.Count()) + require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count()) }) t.Run("processing", func(t *testing.T) { @@ -1448,7 +1462,7 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) { item.setProcessing() bq.addLocked(item) // Inserting a range that is already being processed should not enqueue again. - requeued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{ + markedAsRequeued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{ onEnqueueResult: func(indexOnHeap int, err error) { require.Equal(t, -1, indexOnHeap) require.ErrorIs(t, err, errReplicaAlreadyProcessing) @@ -1457,7 +1471,9 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) { t.Fatal("unexpected call to onProcessResult") }, }) - require.True(t, requeued) + require.True(t, markedAsRequeued) + require.Equal(t, int64(0), bq.enqueueAdd.Count()) + require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count()) }) t.Run("fullqueue", func(t *testing.T) { testQueue := &testQueueImpl{} @@ -1477,6 +1493,8 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) { }, }) require.True(t, queued) + require.Equal(t, int64(1), bq.enqueueAdd.Count()) + require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count()) }) } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 19a4512ab7b9..2e01514c1c44 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -2969,18 +2969,22 @@ func (r *Replica) maybeEnqueueProblemRange( if err != nil { log.KvDistribution.VInfof(ctx, level, "decommissioning nudger failed to enqueue range %v due to %v", r.Desc(), err) + r.store.metrics.DecommissioningNudgerEnqueueFailure.Inc(1) } else { log.KvDistribution.VInfof(ctx, level, "decommissioning nudger successfully enqueued range %v at index %d", r.Desc(), indexOnHeap) + r.store.metrics.DecommissioningNudgerEnqueueSuccess.Inc(1) } }, onProcessResult: func(err error) { if err != nil { log.KvDistribution.VInfof(ctx, level, "decommissioning nudger failed to process range %v due to %v", r.Desc(), err) + r.store.metrics.DecommissioningNudgerProcessFailure.Inc(1) } else { log.KvDistribution.VInfof(ctx, level, "decommissioning nudger successfully processed replica %s", r.Desc()) + r.store.metrics.DecommissioningNudgerProcessSuccess.Inc(1) } }, }) diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index c9d1b52eb248..6613aca510dc 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -608,14 +608,18 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica // so we use the raftSnapshotQueueTimeoutFunc. This function sets a // timeout based on the range size and the sending rate in addition // to consulting the setting which controls the minimum timeout. - processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate), - successes: store.metrics.ReplicateQueueSuccesses, - failures: store.metrics.ReplicateQueueFailures, - pending: store.metrics.ReplicateQueuePending, - full: store.metrics.ReplicateQueueFull, - processingNanos: store.metrics.ReplicateQueueProcessingNanos, - purgatory: store.metrics.ReplicateQueuePurgatory, - disabledConfig: kvserverbase.ReplicateQueueEnabled, + processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate), + enqueueAdd: store.metrics.ReplicateQueueEnqueueAdd, + enqueueFailedPrecondition: store.metrics.ReplicateQueueEnqueueFailedPrecondition, + enqueueNoAction: store.metrics.ReplicateQueueEnqueueNoAction, + enqueueUnexpectedError: store.metrics.ReplicateQueueEnqueueUnexpectedError, + successes: store.metrics.ReplicateQueueSuccesses, + failures: store.metrics.ReplicateQueueFailures, + pending: store.metrics.ReplicateQueuePending, + full: store.metrics.ReplicateQueueFull, + processingNanos: store.metrics.ReplicateQueueProcessingNanos, + purgatory: store.metrics.ReplicateQueuePurgatory, + disabledConfig: kvserverbase.ReplicateQueueEnabled, }, ) rq.baseQueue.SetMaxSize(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV)) diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 0fe914d64447..4e03789fadeb 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -2485,8 +2485,10 @@ func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) { value = store.Metrics().DecommissioningRangeCount.Value() case "enqueue": value = store.Metrics().DecommissioningNudgerEnqueue.Count() - case "not_leaseholder_or_invalid_lease": - value = store.Metrics().DecommissioningNudgerNotLeaseholderOrInvalidLease.Count() + case "enqueue_success": + value = store.Metrics().DecommissioningNudgerEnqueueSuccess.Count() + case "process_success": + value = store.Metrics().DecommissioningNudgerProcessSuccess.Count() default: t.Fatalf("unknown metric type: %s", metricType) } @@ -2507,9 +2509,10 @@ func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) { // Wait for the enqueue logic to trigger and validate metrics were updated. testutils.SucceedsSoon(t, func() error { + // TODO(wenyihu6): is there a race condition here where we might not observe + // decommissioning_ranges increasing? afterDecommissioningRanges := getDecommissioningNudgerMetricValue(t, tc, "decommissioning_ranges") afterEnqueued := getDecommissioningNudgerMetricValue(t, tc, "enqueue") - if afterDecommissioningRanges <= initialDecommissioningRanges { return errors.New("expected DecommissioningRangeCount to increase") } @@ -2532,4 +2535,8 @@ func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) { } return nil }) + afterEnqueueSuccess := getDecommissioningNudgerMetricValue(t, tc, "enqueue_success") + require.Greater(t, afterEnqueueSuccess, int64(0)) + afterProcessSuccess := getDecommissioningNudgerMetricValue(t, tc, "process_success") + require.Greater(t, afterProcessSuccess, int64(0)) }