Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@
<tr><td>STORAGE</td><td>rangekeybytes</td><td>Number of bytes taken up by range keys (e.g. MVCC range tombstones)</td><td>Storage</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>rangekeycount</td><td>Count of all range keys (e.g. MVCC range tombstones)</td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges</td><td>Number of ranges</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges.decommissioning</td><td>Number of ranges with at lease one replica on a decommissioning node</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges.overreplicated</td><td>Number of ranges with more live replicas than the replication target</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges.unavailable</td><td>Number of ranges with fewer live replicas than needed for quorum</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges.underreplicated</td><td>Number of ranges with fewer live replicas than the replication target</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ var (
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDecommissioningRangeCount = metric.Metadata{
Name: "ranges.decommissioning",
Help: "Number of ranges with at lease one replica on a decommissioning node",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}

// Lease request metrics.
metaLeaseRequestSuccessCount = metric.Metadata{
Expand Down Expand Up @@ -2566,6 +2572,7 @@ type StoreMetrics struct {
UnavailableRangeCount *metric.Gauge
UnderReplicatedRangeCount *metric.Gauge
OverReplicatedRangeCount *metric.Gauge
DecommissioningRangeCount *metric.Gauge

// Lease request metrics for successful and failed lease requests. These
// count proposals (i.e. it does not matter how many replicas apply the
Expand Down Expand Up @@ -3263,6 +3270,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
UnavailableRangeCount: metric.NewGauge(metaUnavailableRangeCount),
UnderReplicatedRangeCount: metric.NewGauge(metaUnderReplicatedRangeCount),
OverReplicatedRangeCount: metric.NewGauge(metaOverReplicatedRangeCount),
DecommissioningRangeCount: metric.NewGauge(metaDecommissioningRangeCount),

// Lease request metrics.
LeaseRequestSuccessCount: metric.NewCounter(metaLeaseRequestSuccessCount),
Expand Down
61 changes: 61 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
Expand Down Expand Up @@ -927,6 +928,12 @@ type Replica struct {
// changes for a range on the leaseholder.
allocatorToken *plan.AllocatorToken

// lastProblemRangeReplicateEnqueueTime is the last time this replica was
// eagerly enqueued into the replicate queue due to being underreplicated
// or having a decommissioning replica. This is used to throttle enqueue
// attempts.
lastProblemRangeReplicateEnqueueTime atomic.Value

// unreachablesMu contains a set of remote ReplicaIDs that are to be reported
// as unreachable on the next raft tick.
unreachablesMu struct {
Expand Down Expand Up @@ -2557,3 +2564,57 @@ func racV2EnabledWhenLeaderLevel(
// TODO(sumeer): implement fully, once all the dependencies are implemented.
return replica_rac2.NotEnabledWhenLeader
}

// maybeEnqueueProblemRange will enqueue the replica for processing into the
// replicate queue iff:
//
// - The replica is the holder of a valid lease.
// - EnqueueProblemRangeInReplicateQueueInterval is enabled (set to a
// non-zero value)
// - The last time the replica was enqueued is longer than
// EnqueueProblemRangeInReplicateQueueInterval.
//
// The replica is enqueued at a decommissioning priority. Note that by default,
// this behavior is disabled (zero interval). Also note that this method should
// NOT be called unless the range is known to require action e.g.,
// decommissioning|underreplicated.
//
// NOTE: This method is motivated by a bug where decommissioning stalls because
// a decommissioning range is not enqueued in the replicate queue in a timely
// manner via the replica scanner, see #130199. This functionality is disabled
// by default for this reason.
func (r *Replica) maybeEnqueueProblemRange(
ctx context.Context, now time.Time, leaseValid, isLeaseholder bool,
) {
// The method expects the caller to provide whether the lease is valid and
// the replica is the leaseholder for the range, so that it can avoid
// unnecessary work. We expect this method to be called in the context of
// updating metrics.
if !isLeaseholder || !leaseValid {
// The replicate queue will not process the replica without a valid lease.
// Nothing to do.
return
}

interval := EnqueueProblemRangeInReplicateQueueInterval.Get(&r.store.cfg.Settings.SV)
if interval == 0 {
// The setting is disabled.
return
}
lastTime := r.lastProblemRangeReplicateEnqueueTime.Load().(time.Time)
if lastTime.Add(interval).After(now) {
// The last time the replica was enqueued is less than the interval ago,
// nothing to do.
return
}
// The replica is the leaseholder for a range which requires action and it
// has been longer than EnqueueProblemRangeInReplicateQueueInterval since the
// last time it was enqueued. Try to swap the last time with now. We don't
// expect a race, however if the value changed underneath us we won't enqueue
// the replica as we lost the race.
if !r.lastProblemRangeReplicateEnqueueTime.CompareAndSwap(lastTime, now) {
return
}
r.store.replicateQueue.AddAsync(ctx, r,
allocatorimpl.AllocatorReplaceDecommissioningVoter.Priority())
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func newUninitializedReplicaWithoutRaftGroup(
store.rebalanceObjManager.Objective().ToSplitObjective(),
)
}
r.lastProblemRangeReplicateEnqueueTime.Store(store.Clock().PhysicalTime())

// NB: the state will be loaded when the replica gets initialized.
r.mu.state = uninitState
Expand Down
19 changes: 17 additions & 2 deletions pkg/kv/kvserver/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type ReplicaMetrics struct {
Unavailable bool
Underreplicated bool
Overreplicated bool
Decommissioning bool
RaftLogTooLarge bool
RangeTooLarge bool
BehindCount int64
Expand Down Expand Up @@ -172,7 +173,7 @@ func calcReplicaMetrics(d calcReplicaMetricsInput) ReplicaMetrics {
rangeTooLargeMultiple = 2
)
largeRangeThreshold := rangeTooLargeMultiple * d.conf.RangeMaxBytes
rangeCounter, unavailable, underreplicated, overreplicated, tooLarge := calcRangeCounter(
rangeCounter, unavailable, underreplicated, overreplicated, tooLarge, decommissioning := calcRangeCounter(
d.storeID, d.desc, d.leaseStatus, d.vitalityMap, d.conf.GetNumVoters(), d.conf.NumReplicas,
d.clusterNodes, largeRangeThreshold, d.rangeSize)

Expand Down Expand Up @@ -200,6 +201,7 @@ func calcReplicaMetrics(d calcReplicaMetricsInput) ReplicaMetrics {
Unavailable: unavailable,
Underreplicated: underreplicated,
Overreplicated: overreplicated,
Decommissioning: decommissioning,
RaftLogTooLarge: d.raftLogSizeTrusted &&
d.raftLogSize > raftLogTooLargeMultiple*d.raftCfg.RaftLogTruncationThreshold,
RangeTooLarge: tooLarge,
Expand Down Expand Up @@ -243,7 +245,7 @@ func calcRangeCounter(
numVoters, numReplicas int32,
clusterNodes int,
rangeTooLargeThreshold, rangeSize int64,
) (rangeCounter, unavailable, underreplicated, overreplicated, tooLarge bool) {
) (rangeCounter, unavailable, underreplicated, overreplicated, tooLarge, decommissioning bool) {
// If there is a live leaseholder (regardless of whether the lease is still
// valid) that leaseholder is responsible for range-level metrics.
if vitalityMap[leaseStatus.Lease.Replica.NodeID].IsLive(livenesspb.Metrics) {
Expand Down Expand Up @@ -279,6 +281,7 @@ func calcRangeCounter(
overreplicated = true
}
tooLarge = rangeSize > rangeTooLargeThreshold
decommissioning = calcDecommissioningCount(desc, vitalityMap) > 0
}
return
}
Expand Down Expand Up @@ -333,6 +336,18 @@ func calcBehindCount(
return behindCount
}

func calcDecommissioningCount(
desc *roachpb.RangeDescriptor, vitalityMap livenesspb.NodeVitalityMap,
) int {
var decommissioningCount int
for _, rd := range desc.Replicas().Descriptors() {
if vitalityMap[rd.NodeID].IsDecommissioning() {
decommissioningCount++
}
}
return decommissioningCount
}

// LoadStats returns the load statistics for the replica.
func (r *Replica) LoadStats() load.ReplicaLoadStats {
return r.loadStats.Stats()
Expand Down
38 changes: 29 additions & 9 deletions pkg/kv/kvserver/replica_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,19 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
}))

{
ctr, down, under, over, _ := calcRangeCounter(1100, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
ctr, down, under, over, _, decom := calcRangeCounter(1100, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
1000: livenesspb.FakeNodeVitality(true), // by NodeID
}, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */, 0 /* rangeTooLargeThreshold */, 0 /* rangeSize */)

require.True(t, ctr)
require.True(t, down)
require.True(t, under)
require.False(t, over)
require.False(t, decom)
}

{
ctr, down, under, over, _ := calcRangeCounter(1000, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
ctr, down, under, over, _, decom := calcRangeCounter(1000, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
1000: livenesspb.FakeNodeVitality(false),
}, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */, 0 /* rangeTooLargeThreshold */, 0 /* rangeSize */)

Expand All @@ -76,10 +77,11 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.False(t, under)
require.False(t, over)
require.False(t, decom)
}

{
ctr, down, under, over, _ := calcRangeCounter(11, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
ctr, down, under, over, _, decom := calcRangeCounter(11, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
10: livenesspb.FakeNodeVitality(true),
100: livenesspb.FakeNodeVitality(true),
1000: livenesspb.FakeNodeVitality(true),
Expand All @@ -90,11 +92,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.False(t, under)
require.False(t, over)
require.False(t, decom)
}

{
// Single non-voter dead
ctr, down, under, over, _ := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
ctr, down, under, over, _, decom := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
10: livenesspb.FakeNodeVitality(true),
100: livenesspb.FakeNodeVitality(true),
1000: livenesspb.FakeNodeVitality(false),
Expand All @@ -105,11 +108,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.True(t, under)
require.False(t, over)
require.False(t, decom)
}

{
// All non-voters are dead, but range is not unavailable
ctr, down, under, over, _ := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
ctr, down, under, over, _, decom := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
10: livenesspb.FakeNodeVitality(true),
100: livenesspb.FakeNodeVitality(false),
1000: livenesspb.FakeNodeVitality(false),
Expand All @@ -120,11 +124,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.True(t, under)
require.False(t, over)
require.False(t, decom)
}

{
// More non-voters than needed
ctr, down, under, over, _ := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
ctr, down, under, over, _, decom := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
10: livenesspb.FakeNodeVitality(true),
100: livenesspb.FakeNodeVitality(true),
1000: livenesspb.FakeNodeVitality(true),
Expand All @@ -135,11 +140,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.False(t, under)
require.True(t, over)
require.False(t, decom)
}

{
// Range larger than the threshold.
ctr, _, _, _, large := calcRangeCounter(1100, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
ctr, _, _, _, large, _ := calcRangeCounter(1100, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
1000: livenesspb.FakeNodeVitality(true), // by NodeID
}, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */, 1000 /* rangeTooLargeThreshold */, 2000 /* rangeSize */)

Expand All @@ -148,14 +154,28 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
}

{
ctr, _, _, _, large := calcRangeCounter(1000, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
ctr, _, _, _, large, _ := calcRangeCounter(1000, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
1000: livenesspb.FakeNodeVitality(false),
}, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */, 1000 /* rangeTooLargeThreshold */, 2000 /* rangeSize */)
require.False(t, ctr)
// Only the node responsible for the range can report if the range is too
// large.
require.False(t, large)
}

{
// Decommissioning node.
vitality := livenesspb.TestCreateNodeVitality(10, 100, 1000, 2000)
vitality.Decommissioning(100, true /* alive */)
ctr, down, under, over, _, decom := calcRangeCounter(11, threeVotersAndSingleNonVoter, leaseStatus, vitality.ScanNodeVitalityFromCache(),
3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */, 0 /* rangeTooLargeThreshold */, 0 /* rangeSize */)

require.True(t, ctr)
require.False(t, down)
require.False(t, under)
require.False(t, over)
require.True(t, decom)
}
}

func TestCalcRangeCounterLeaseHolder(t *testing.T) {
Expand Down Expand Up @@ -262,7 +282,7 @@ func TestCalcRangeCounterLeaseHolder(t *testing.T) {
for _, nodeID := range tc.liveNodes {
livenessMap[nodeID] = livenesspb.FakeNodeVitality(true)
}
ctr, _, _, _, _ := calcRangeCounter(tc.storeID, rangeDesc, tc.leaseStatus, livenessMap,
ctr, _, _, _, _, _ := calcRangeCounter(tc.storeID, rangeDesc, tc.leaseStatus, livenessMap,
3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */, 0 /* rangeTooLargeThreshold */, 0 /* rangeSize */)
require.Equal(t, tc.expectCounter, ctr)
})
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,22 @@ var EnqueueInReplicateQueueOnSpanConfigUpdateEnabled = settings.RegisterBoolSett
true,
)

// EnqueueProblemRangeInReplicateQueueInterval controls the interval at which
// problem ranges are enqueued into the replicate queue for processing, outside
// of the normal scanner interval. A problem range is one which is
// underreplicated or has a replica on a decommissioning store. The setting is
// disabled when set to 0. By default, the setting is disabled.
var EnqueueProblemRangeInReplicateQueueInterval = settings.RegisterDurationSetting(
settings.SystemOnly,
"kv.enqueue_in_replicate_queue_on_problem.interval",
"interval at which problem ranges are enqueued into the replicate queue for "+
"processing, outside of the normal scanner interval; a problem range is "+
"one which is underreplicated or has a replica on a decommissioning store, "+
"disabled when set to 0",
0,
settings.NonNegativeDuration,
)

var (
metaReplicateQueueAddReplicaCount = metric.Metadata{
Name: "queue.replicate.addreplica",
Expand Down
Loading