Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@
<tr><td>STORAGE</td><td>rangekeybytes</td><td>Number of bytes taken up by range keys (e.g. MVCC range tombstones)</td><td>Storage</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>rangekeycount</td><td>Count of all range keys (e.g. MVCC range tombstones)</td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges</td><td>Number of ranges</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges.decommissioning</td><td>Number of ranges with at lease one replica on a decommissioning node</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges.overreplicated</td><td>Number of ranges with more live replicas than the replication target</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges.unavailable</td><td>Number of ranges with fewer live replicas than needed for quorum</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>ranges.underreplicated</td><td>Number of ranges with fewer live replicas than the replication target</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ var (
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDecommissioningRangeCount = metric.Metadata{
Name: "ranges.decommissioning",
Help: "Number of ranges with at lease one replica on a decommissioning node",
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}

// Lease request metrics.
metaLeaseRequestSuccessCount = metric.Metadata{
Expand Down Expand Up @@ -2343,6 +2349,7 @@ type StoreMetrics struct {
UnavailableRangeCount *metric.Gauge
UnderReplicatedRangeCount *metric.Gauge
OverReplicatedRangeCount *metric.Gauge
DecommissioningRangeCount *metric.Gauge

// Lease request metrics for successful and failed lease requests. These
// count proposals (i.e. it does not matter how many replicas apply the
Expand Down Expand Up @@ -3012,6 +3019,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
UnavailableRangeCount: metric.NewGauge(metaUnavailableRangeCount),
UnderReplicatedRangeCount: metric.NewGauge(metaUnderReplicatedRangeCount),
OverReplicatedRangeCount: metric.NewGauge(metaOverReplicatedRangeCount),
DecommissioningRangeCount: metric.NewGauge(metaDecommissioningRangeCount),

// Lease request metrics.
LeaseRequestSuccessCount: metric.NewCounter(metaLeaseRequestSuccessCount),
Expand Down
66 changes: 66 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc"
Expand Down Expand Up @@ -887,6 +888,12 @@ type Replica struct {
// loadBasedSplitter keeps information about load-based splitting.
loadBasedSplitter split.Decider

// lastProblemRangeReplicateEnqueueTime is the last time this replica was
// eagerly enqueued into the replicate queue due to being underreplicated
// or having a decommissioning replica. This is used to throttle enqueue
// attempts.
lastProblemRangeReplicateEnqueueTime atomic.Value

// unreachablesMu contains a set of remote ReplicaIDs that are to be reported
// as unreachable on the next raft tick.
unreachablesMu struct {
Expand Down Expand Up @@ -2451,3 +2458,62 @@ func (r *Replica) ReadProtectedTimestampsForTesting(ctx context.Context) (err er
ts, err = r.readProtectedTimestampsRLocked(ctx)
return err
}

// GetMutexForTesting returns the replica's mutex, for use in tests.
func (r *Replica) GetMutexForTesting() *ReplicaMutex {
return &r.mu.ReplicaMutex
}

// maybeEnqueueProblemRange will enqueue the replica for processing into the
// replicate queue iff:
//
// - The replica is the holder of a valid lease.
// - EnqueueProblemRangeInReplicateQueueInterval is enabled (set to a
// non-zero value)
// - The last time the replica was enqueued is longer than
// EnqueueProblemRangeInReplicateQueueInterval.
//
// The replica is enqueued at a decommissioning priority. Note that by default,
// this behavior is disabled (zero interval). Also note that this method should
// NOT be called unless the range is known to require action e.g.,
// decommissioning|underreplicated.
//
// NOTE: This method is motivated by a bug where decommissioning stalls because
// a decommissioning range is not enqueued in the replicate queue in a timely
// manner via the replica scanner, see #130199. This functionality is disabled
// by default for this reason.
func (r *Replica) maybeEnqueueProblemRange(
ctx context.Context, now time.Time, leaseValid, isLeaseholder bool,
) {
// The method expects the caller to provide whether the lease is valid and
// the replica is the leaseholder for the range, so that it can avoid
// unnecessary work. We expect this method to be called in the context of
// updating metrics.
if !isLeaseholder || !leaseValid {
// The replicate queue will not process the replica without a valid lease.
// Nothing to do.
return
}

interval := EnqueueProblemRangeInReplicateQueueInterval.Get(&r.store.cfg.Settings.SV)
if interval == 0 {
// The setting is disabled.
return
}
lastTime := r.lastProblemRangeReplicateEnqueueTime.Load().(time.Time)
if lastTime.Add(interval).After(now) {
// The last time the replica was enqueued is less than the interval ago,
// nothing to do.
return
}
// The replica is the leaseholder for a range which requires action and it
// has been longer than EnqueueProblemRangeInReplicateQueueInterval since the
// last time it was enqueued. Try to swap the last time with now. We don't
// expect a race, however if the value changed underneath us we won't enqueue
// the replica as we lost the race.
if !r.lastProblemRangeReplicateEnqueueTime.CompareAndSwap(lastTime, now) {
return
}
r.store.replicateQueue.AddAsync(ctx, r,
allocatorimpl.AllocatorReplaceDecommissioningVoter.Priority())
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func newUninitializedReplicaWithoutRaftGroup(
store.rebalanceObjManager.Objective().ToSplitObjective(),
)
}
r.lastProblemRangeReplicateEnqueueTime.Store(store.Clock().PhysicalTime())

// NB: the state will be loaded when the replica gets initialized.
r.mu.state = uninitState
Expand Down
22 changes: 19 additions & 3 deletions pkg/kv/kvserver/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type ReplicaMetrics struct {
Unavailable bool
Underreplicated bool
Overreplicated bool
Decommissioning bool
RaftLogTooLarge bool
BehindCount int64
PausedFollowerCount int64
Expand Down Expand Up @@ -163,8 +164,9 @@ func calcReplicaMetrics(d calcReplicaMetricsInput) ReplicaMetrics {
}
}

rangeCounter, unavailable, underreplicated, overreplicated := calcRangeCounter(
d.storeID, d.desc, d.leaseStatus, d.vitalityMap, d.conf.GetNumVoters(), d.conf.NumReplicas, d.clusterNodes)
rangeCounter, unavailable, underreplicated, overreplicated, decommissioning := calcRangeCounter(
d.storeID, d.desc, d.leaseStatus, d.vitalityMap, d.conf.GetNumVoters(), d.conf.NumReplicas,
d.clusterNodes)

// The raft leader computes the number of raft entries that replicas are
// behind.
Expand All @@ -191,6 +193,7 @@ func calcReplicaMetrics(d calcReplicaMetricsInput) ReplicaMetrics {
Unavailable: unavailable,
Underreplicated: underreplicated,
Overreplicated: overreplicated,
Decommissioning: decommissioning,
RaftLogTooLarge: d.raftLogSizeTrusted &&
d.raftLogSize > raftLogTooLargeMultiple*d.raftCfg.RaftLogTruncationThreshold,
BehindCount: leaderBehindCount,
Expand Down Expand Up @@ -231,7 +234,7 @@ func calcRangeCounter(
vitalityMap livenesspb.NodeVitalityMap,
numVoters, numReplicas int32,
clusterNodes int,
) (rangeCounter, unavailable, underreplicated, overreplicated bool) {
) (rangeCounter, unavailable, underreplicated, overreplicated, decommissioning bool) {
// If there is a live leaseholder (regardless of whether the lease is still
// valid) that leaseholder is responsible for range-level metrics.
if vitalityMap[leaseStatus.Lease.Replica.NodeID].IsLive(livenesspb.Metrics) {
Expand Down Expand Up @@ -266,6 +269,7 @@ func calcRangeCounter(
} else if neededVoters < liveVoters || neededNonVoters < liveNonVoters {
overreplicated = true
}
decommissioning = calcDecommissioningCount(desc, vitalityMap) > 0
}
return
}
Expand Down Expand Up @@ -320,6 +324,18 @@ func calcBehindCount(
return behindCount
}

func calcDecommissioningCount(
desc *roachpb.RangeDescriptor, vitalityMap livenesspb.NodeVitalityMap,
) int {
var decommissioningCount int
for _, rd := range desc.Replicas().Descriptors() {
if vitalityMap[rd.NodeID].IsDecommissioning() {
decommissioningCount++
}
}
return decommissioningCount
}

// LoadStats returns the load statistics for the replica.
func (r *Replica) LoadStats() load.ReplicaLoadStats {
return r.loadStats.Stats()
Expand Down
34 changes: 27 additions & 7 deletions pkg/kv/kvserver/replica_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,19 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
}))

{
ctr, down, under, over := calcRangeCounter(1100, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
ctr, down, under, over, decom := calcRangeCounter(1100, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
1000: livenesspb.FakeNodeVitality(true), // by NodeID
}, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */)

require.True(t, ctr)
require.True(t, down)
require.True(t, under)
require.False(t, over)
require.False(t, decom)
}

{
ctr, down, under, over := calcRangeCounter(1000, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
ctr, down, under, over, decom := calcRangeCounter(1000, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
1000: livenesspb.FakeNodeVitality(false),
}, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */)

Expand All @@ -76,10 +77,11 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.False(t, under)
require.False(t, over)
require.False(t, decom)
}

{
ctr, down, under, over := calcRangeCounter(11, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
ctr, down, under, over, decom := calcRangeCounter(11, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
10: livenesspb.FakeNodeVitality(true),
100: livenesspb.FakeNodeVitality(true),
1000: livenesspb.FakeNodeVitality(true),
Expand All @@ -90,11 +92,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.False(t, under)
require.False(t, over)
require.False(t, decom)
}

{
// Single non-voter dead
ctr, down, under, over := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
ctr, down, under, over, decom := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
10: livenesspb.FakeNodeVitality(true),
100: livenesspb.FakeNodeVitality(true),
1000: livenesspb.FakeNodeVitality(false),
Expand All @@ -105,11 +108,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.True(t, under)
require.False(t, over)
require.False(t, decom)
}

{
// All non-voters are dead, but range is not unavailable
ctr, down, under, over := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
ctr, down, under, over, decom := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
10: livenesspb.FakeNodeVitality(true),
100: livenesspb.FakeNodeVitality(false),
1000: livenesspb.FakeNodeVitality(false),
Expand All @@ -120,11 +124,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.True(t, under)
require.False(t, over)
require.False(t, decom)
}

{
// More non-voters than needed
ctr, down, under, over := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
ctr, down, under, over, decom := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
10: livenesspb.FakeNodeVitality(true),
100: livenesspb.FakeNodeVitality(true),
1000: livenesspb.FakeNodeVitality(true),
Expand All @@ -135,6 +140,21 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.False(t, under)
require.True(t, over)
require.False(t, decom)
}

{
// Decommissioning node.
vitality := livenesspb.TestCreateNodeVitality(10, 100, 1000, 2000)
vitality.Decommissioning(100, true /* alive */)
ctr, down, under, over, decom := calcRangeCounter(11, threeVotersAndSingleNonVoter, leaseStatus, vitality.ScanNodeVitalityFromCache(),
3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */)

require.True(t, ctr)
require.False(t, down)
require.False(t, under)
require.False(t, over)
require.True(t, decom)
}
}

Expand Down Expand Up @@ -242,7 +262,7 @@ func TestCalcRangeCounterLeaseHolder(t *testing.T) {
for _, nodeID := range tc.liveNodes {
livenessMap[nodeID] = livenesspb.FakeNodeVitality(true)
}
ctr, _, _, _ := calcRangeCounter(tc.storeID, rangeDesc, tc.leaseStatus, livenessMap,
ctr, _, _, _, _ := calcRangeCounter(tc.storeID, rangeDesc, tc.leaseStatus, livenessMap,
3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */)
require.Equal(t, tc.expectCounter, ctr)
})
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,22 @@ var EnqueueInReplicateQueueOnSpanConfigUpdateEnabled = settings.RegisterBoolSett
true,
)

// EnqueueProblemRangeInReplicateQueueInterval controls the interval at which
// problem ranges are enqueued into the replicate queue for processing, outside
// of the normal scanner interval. A problem range is one which is
// underreplicated or has a replica on a decommissioning store. The setting is
// disabled when set to 0. By default, the setting is disabled.
var EnqueueProblemRangeInReplicateQueueInterval = settings.RegisterDurationSetting(
settings.SystemOnly,
"kv.enqueue_in_replicate_queue_on_problem.interval",
"interval at which problem ranges are enqueued into the replicate queue for "+
"processing, outside of the normal scanner interval; a problem range is "+
"one which is underreplicated or has a replica on a decommissioning store, "+
"disabled when set to 0",
0,
settings.NonNegativeDuration,
)

var (
metaReplicateQueueAddReplicaCount = metric.Metadata{
Name: "queue.replicate.addreplica",
Expand Down
67 changes: 67 additions & 0 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2629,3 +2629,70 @@ func TestReplicateQueueLeasePreferencePurgatoryError(t *testing.T) {
return checkLeaseCount(nextPreferredNode, numRanges)
})
}

// TestReplicateQueueDecommissionScannerDisabled asserts that decommissioning
// replicas are replaced by the replicate queue despite the scanner being
// disabled, when EnqueueProblemRangeInReplicateQueueInterval is set to a
// non-zero value (enabled).
func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Enable enqueueing of problem ranges in the replicate queue at most once
// per second. We disable the scanner to ensure that the replicate queue
// doesn't rely on the scanner to process decommissioning replicas.
settings := cluster.MakeTestingClusterSettings()
kvserver.EnqueueProblemRangeInReplicateQueueInterval.Override(
context.Background(), &settings.SV, 1*time.Second)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: settings,
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
// Disable the scanner.
ScanInterval: 100 * time.Hour,
ScanMinIdleTime: 100 * time.Hour,
ScanMaxIdleTime: 100 * time.Hour,
},
})
defer tc.Stopper().Stop(ctx)

decommissioningSrvIdx := 4
decommissioningSrv := tc.Server(decommissioningSrvIdx)
require.NoError(t, decommissioningSrv.Decommission(ctx,
livenesspb.MembershipStatus_DECOMMISSIONING,
[]roachpb.NodeID{tc.Server(decommissioningSrvIdx).NodeID()}))

// Ensure that the node is marked as decommissioning on every other node.
// Once this is set, we also know that the onDecommissioning callback has
// fired, which enqueues every range which is on the decommissioning node.
testutils.SucceedsSoon(t, func() error {
for i := 0; i < tc.NumServers(); i++ {
srv := tc.Server(i)
if _, exists := srv.DecommissioningNodeMap()[decommissioningSrv.NodeID()]; !exists {
return errors.Newf("node %d not detected to be decommissioning", decommissioningSrv.NodeID())
}
}
return nil
})

// Now add a replica to the decommissioning node and then enable the
// replicate queue. We expect that the replica will be removed after the
// decommissioning replica is noticed via maybeEnqueueProblemRange.
scratchKey := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decommissioningSrvIdx))
tc.ToggleReplicateQueues(true /* active */)
testutils.SucceedsSoon(t, func() error {
var descs []*roachpb.RangeDescriptor
tc.GetFirstStoreFromServer(t, decommissioningSrvIdx).VisitReplicas(func(r *kvserver.Replica) bool {
descs = append(descs, r.Desc())
return true
})
if len(descs) != 0 {
return errors.Errorf("expected no replicas, found %d: %v", len(descs), descs)
}
return nil
})
}
Loading