Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion docs/generated/metrics/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13903,6 +13903,38 @@ layers:
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.enqueue.add
exported_name: queue_replicate_enqueue_add
description: Number of replicas successfully added to the replicate queue
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.enqueue.failedprecondition
exported_name: queue_replicate_enqueue_failedprecondition
description: Number of replicas that failed the precondition checks and were therefore not added to the replicate queue
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.enqueue.noaction
exported_name: queue_replicate_enqueue_noaction
description: Number of replicas for which ShouldQueue determined no action was needed and were therefore not added to the replicate queue
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.enqueue.unexpectederror
exported_name: queue_replicate_enqueue_unexpectederror
description: Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to add to the replicate queue directly), but failed to be enqueued due to unexpected errors
y_axis_label: Replicas
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: queue.replicate.nonvoterpromotions
exported_name: queue_replicate_nonvoterpromotions
description: Number of non-voters promoted to voters by the replicate queue
Expand Down Expand Up @@ -15342,10 +15374,46 @@ layers:
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: ranges.decommissioning.nudger.enqueue.failure
exported_name: ranges_decommissioning_nudger_enqueue_failure
labeled_name: ranges.decommissioning.nudger.enqueue.failure
description: Number of ranges that failed to enqueue at the replicate queue
y_axis_label: Ranges
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: ranges.decommissioning.nudger.enqueue.success
exported_name: ranges_decommissioning_nudger_enqueue_success
labeled_name: ranges.decommissioning.nudger.enqueue.success
description: Number of ranges that were successfully enqueued by the decommisioning nudger
y_axis_label: Ranges
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease
exported_name: ranges_decommissioning_nudger_not_leaseholder_or_invalid_lease
labeled_name: ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease
description: Number of enqueues of a range for decommissioning by the decommissioning nudger that were not the leaseholder or had an invalid lease
description: Number of ranges that were not the leaseholder or had an invalid lease at the decommissioning nudger
y_axis_label: Ranges
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: ranges.decommissioning.nudger.process.failure
exported_name: ranges_decommissioning_nudger_process_failure
labeled_name: ranges.decommissioning.nudger.process.failure
description: Number of ranges enqueued by the decommissioning nudger that failed to process by the replicate queue
y_axis_label: Ranges
type: COUNTER
unit: COUNT
aggregation: AVG
derivative: NON_NEGATIVE_DERIVATIVE
- name: ranges.decommissioning.nudger.process.success
exported_name: ranges_decommissioning_nudger_process_success
labeled_name: ranges.decommissioning.nudger.process.success
description: Number of ranges enqueued by the decommissioning nudger that were successfully processed by the replicate queue
y_axis_label: Ranges
type: COUNTER
unit: COUNT
Expand Down
73 changes: 72 additions & 1 deletion pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,37 @@ var (
LabeledName: "ranges.decommissioning.nudger.enqueue",
StaticLabels: metric.MakeLabelPairs(metric.LabelStatus, "enqueue"),
}
metaDecommissioningNudgerEnqueueSuccess = metric.Metadata{
Name: "ranges.decommissioning.nudger.enqueue.success",
Help: "Number of ranges that were successfully enqueued by the decommisioning nudger",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
LabeledName: "ranges.decommissioning.nudger.enqueue.success",
}
metaDecommissioningNudgerEnqueueFailure = metric.Metadata{
Name: "ranges.decommissioning.nudger.enqueue.failure",
Help: "Number of ranges that failed to enqueue at the replicate queue",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
LabeledName: "ranges.decommissioning.nudger.enqueue.failure",
}
metaDecommissioningNudgerProcessSuccess = metric.Metadata{
Name: "ranges.decommissioning.nudger.process.success",
Help: "Number of ranges enqueued by the decommissioning nudger that were successfully processed by the replicate queue",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
LabeledName: "ranges.decommissioning.nudger.process.success",
}
metaDecommissioningNudgerProcessFailure = metric.Metadata{
Name: "ranges.decommissioning.nudger.process.failure",
Help: "Number of ranges enqueued by the decommissioning nudger that failed to process by the replicate queue",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
LabeledName: "ranges.decommissioning.nudger.process.failure",
}
metaDecommissioningNudgerNotLeaseholderOrInvalidLease = metric.Metadata{
Name: "ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease",
Help: "Number of enqueues of a range for decommissioning by the decommissioning nudger that were not the leaseholder or had an invalid lease",
Help: "Number of ranges that were not the leaseholder or had an invalid lease at the decommissioning nudger",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
LabeledName: "ranges.decommissioning.nudger.not_leaseholder_or_invalid_lease",
Expand Down Expand Up @@ -2144,6 +2172,33 @@ The messages are dropped to help these replicas to recover from I/O overload.`,
Measurement: "Processing Time",
Unit: metric.Unit_NANOSECONDS,
}
metaReplicateQueueEnqueueAdd = metric.Metadata{
Name: "queue.replicate.enqueue.add",
Help: "Number of replicas successfully added to the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueEnqueueFailedPrecondition = metric.Metadata{
Name: "queue.replicate.enqueue.failedprecondition",
Help: "Number of replicas that failed the precondition checks and were therefore not added to the replicate " +
"queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueEnqueueNoAction = metric.Metadata{
Name: "queue.replicate.enqueue.noaction",
Help: "Number of replicas for which ShouldQueue determined no action was needed and were therefore not " +
"added to the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueEnqueueUnexpectedError = metric.Metadata{
Name: "queue.replicate.enqueue.unexpectederror",
Help: "Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to " +
"add to the replicate queue directly), but failed to be enqueued due to unexpected errors",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaLeaseQueueSuccesses = metric.Metadata{
Name: "queue.lease.process.success",
Help: "Number of replicas successfully processed by the replica lease queue",
Expand Down Expand Up @@ -2892,6 +2947,10 @@ type StoreMetrics struct {

// Decommissioning nudger metrics.
DecommissioningNudgerEnqueue *metric.Counter
DecommissioningNudgerEnqueueSuccess *metric.Counter
DecommissioningNudgerEnqueueFailure *metric.Counter
DecommissioningNudgerProcessSuccess *metric.Counter
DecommissioningNudgerProcessFailure *metric.Counter
DecommissioningNudgerNotLeaseholderOrInvalidLease *metric.Counter

// Lease request metrics for successful and failed lease requests. These
Expand Down Expand Up @@ -3188,6 +3247,10 @@ type StoreMetrics struct {
ReplicaGCQueueFailures *metric.Counter
ReplicaGCQueuePending *metric.Gauge
ReplicaGCQueueProcessingNanos *metric.Counter
ReplicateQueueEnqueueAdd *metric.Counter
ReplicateQueueEnqueueFailedPrecondition *metric.Counter
ReplicateQueueEnqueueNoAction *metric.Counter
ReplicateQueueEnqueueUnexpectedError *metric.Counter
ReplicateQueueSuccesses *metric.Counter
ReplicateQueueFailures *metric.Counter
ReplicateQueuePending *metric.Gauge
Expand Down Expand Up @@ -3616,6 +3679,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {

// Decommissioning nuder metrics.
DecommissioningNudgerEnqueue: metric.NewCounter(metaDecommissioningNudgerEnqueue),
DecommissioningNudgerEnqueueSuccess: metric.NewCounter(metaDecommissioningNudgerEnqueueSuccess),
DecommissioningNudgerEnqueueFailure: metric.NewCounter(metaDecommissioningNudgerEnqueueFailure),
DecommissioningNudgerProcessSuccess: metric.NewCounter(metaDecommissioningNudgerProcessSuccess),
DecommissioningNudgerProcessFailure: metric.NewCounter(metaDecommissioningNudgerProcessFailure),
DecommissioningNudgerNotLeaseholderOrInvalidLease: metric.NewCounter(metaDecommissioningNudgerNotLeaseholderOrInvalidLease),

// Lease request metrics.
Expand Down Expand Up @@ -3978,6 +4045,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
ReplicaGCQueueFailures: metric.NewCounter(metaReplicaGCQueueFailures),
ReplicaGCQueuePending: metric.NewGauge(metaReplicaGCQueuePending),
ReplicaGCQueueProcessingNanos: metric.NewCounter(metaReplicaGCQueueProcessingNanos),
ReplicateQueueEnqueueAdd: metric.NewCounter(metaReplicateQueueEnqueueAdd),
ReplicateQueueEnqueueFailedPrecondition: metric.NewCounter(metaReplicateQueueEnqueueFailedPrecondition),
ReplicateQueueEnqueueNoAction: metric.NewCounter(metaReplicateQueueEnqueueNoAction),
ReplicateQueueEnqueueUnexpectedError: metric.NewCounter(metaReplicateQueueEnqueueUnexpectedError),
ReplicateQueueSuccesses: metric.NewCounter(metaReplicateQueueSuccesses),
ReplicateQueueFailures: metric.NewCounter(metaReplicateQueueFailures),
ReplicateQueuePending: metric.NewGauge(metaReplicateQueuePending),
Expand Down
74 changes: 72 additions & 2 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,20 @@ type queueConfig struct {
processDestroyedReplicas bool
// processTimeout returns the timeout for processing a replica.
processTimeoutFunc queueProcessTimeoutFunc
// enqueueAdd is a counter of replicas that were successfully added to the
// queue.
enqueueAdd *metric.Counter
// enqueueFailedPrecondition is a counter of replicas that failed the
// precondition checks and were therefore not added to the queue.
enqueueFailedPrecondition *metric.Counter
// enqueueNoAction is a counter of replicas that had ShouldQueue determine no
// action was needed and were therefore not added to the queue.
enqueueNoAction *metric.Counter
// enqueueUnexpectedError is a counter of replicas that were expected to be
// enqueued (either had ShouldQueue return true or the caller explicitly
// requested to be added to the queue directly), but failed to be enqueued
// during the enqueue process (such as Async was rated limited).
enqueueUnexpectedError *metric.Counter
// successes is a counter of replicas processed successfully.
successes *metric.Counter
// failures is a counter of replicas which failed processing.
Expand Down Expand Up @@ -733,16 +747,62 @@ func (bq *baseQueue) AddAsyncWithCallback(
h.Add(ctx, repl, prio, cb)
}); err != nil {
cb.onEnqueueResult(-1 /*indexOnHeap*/, err)
bq.updateMetricsOnEnqueueUnexpectedError()
}
}

// AddAsync adds the replica to the queue. Unlike MaybeAddAsync, it will wait
// for other operations to finish instead of turning into a noop (because
// unlikely MaybeAdd, Add is not subject to being called opportunistically).
func (bq *baseQueue) AddAsync(ctx context.Context, repl replicaInQueue, prio float64) {
_ = bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
if err := bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
h.Add(ctx, repl, prio, noopProcessCallback)
})
}); err != nil {
// We don't update metrics in MaybeAddAsync because we don't know if the
// replica should be queued at this point. We only count it as an unexpected
// error when we're certain the replica should be enqueued. In this case,
// the caller explicitly wants to add the replica to the queue directly, so
// we do update the metrics on unexpected error.
bq.updateMetricsOnEnqueueUnexpectedError()
}
}

// updateMetricsOnEnqueueFailedPrecondition updates the metrics when a replica
// fails precondition checks (replicaCanBeProcessed) and should not be
// considered for enqueueing. This may include cases where the replica does not
// have a valid lease, is uninitialized, is destroyed, failed to retrieve span
// conf reader, or unsplit ranges.
func (bq *baseQueue) updateMetricsOnEnqueueFailedPrecondition() {
if bq.enqueueFailedPrecondition != nil {
bq.enqueueFailedPrecondition.Inc(1)
}
}

// updateMetricsOnEnqueueNoAction updates the metrics when shouldQueue
// determines no action is needed and the replica is not added to the queue.
func (bq *baseQueue) updateMetricsOnEnqueueNoAction() {
if bq.enqueueNoAction != nil {
bq.enqueueNoAction.Inc(1)
}
}

// updateMetricsOnEnqueueUnexpectedError updates the metrics when an unexpected
// error occurs during enqueue operations. This should be called for replicas
// that were expected to be enqueued (either had ShouldQueue return true or the
// caller explicitly requested to be added to the queue directly), but failed to
// be enqueued during the enqueue process (such as Async was rated limited).
func (bq *baseQueue) updateMetricsOnEnqueueUnexpectedError() {
if bq.enqueueUnexpectedError != nil {
bq.enqueueUnexpectedError.Inc(1)
}
}

// updateMetricsOnEnqueueAdd updates the metrics when a replica is successfully
// added to the queue.
func (bq *baseQueue) updateMetricsOnEnqueueAdd() {
if bq.enqueueAdd != nil {
bq.enqueueAdd.Inc(1)
}
}

func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) {
Expand Down Expand Up @@ -779,6 +839,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
// Load the system config if it's needed.
confReader, err := bq.replicaCanBeProcessed(ctx, repl, false /* acquireLeaseIfNeeded */)
if err != nil {
bq.updateMetricsOnEnqueueFailedPrecondition()
return
}

Expand All @@ -788,17 +849,20 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
realRepl, _ := repl.(*Replica)
should, priority := bq.impl.shouldQueue(ctx, now, realRepl, confReader)
if !should {
bq.updateMetricsOnEnqueueNoAction()
return
}

extConf := bq.skipIfReplicaHasExternalFilesConfig
if extConf != nil && extConf.Get(&bq.store.cfg.Settings.SV) {
hasExternal, err := realRepl.HasExternalBytes()
if err != nil {
bq.updateMetricsOnEnqueueUnexpectedError()
log.Dev.Warningf(ctx, "could not determine if %s has external bytes: %s", realRepl, err)
return
}
if hasExternal {
bq.updateMetricsOnEnqueueUnexpectedError()
log.Dev.VInfof(ctx, 1, "skipping %s for %s because it has external bytes", bq.name, realRepl)
return
}
Expand Down Expand Up @@ -841,8 +905,10 @@ func (bq *baseQueue) addInternal(
cb processCallback,
) (added bool, err error) {
defer func() {
// INVARIANT: added => err == nil.
if err != nil {
cb.onEnqueueResult(-1 /* indexOnHeap */, err)
bq.updateMetricsOnEnqueueUnexpectedError()
}
}()
// NB: this is intentionally outside of bq.mu to avoid having to consider
Expand Down Expand Up @@ -935,6 +1001,10 @@ func (bq *baseQueue) addInternal(
default:
// No need to signal again.
}
// Note that we are bumping enqueueAdd here instead of during defer to avoid
// treating requeuing a processing replica as newly added. They will be
// re-added to the queue later which will double count them.
bq.updateMetricsOnEnqueueAdd()
// Note: it may already be dropped or dropped afterwards.
cb.onEnqueueResult(item.index /*indexOnHeap*/, nil)
return true, nil
Expand Down
Loading