diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 1a33183ca431..4b2d6f653232 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -511,6 +511,7 @@
| STORAGE | rangekeybytes | Number of bytes taken up by range keys (e.g. MVCC range tombstones) | Storage | GAUGE | BYTES | AVG | NONE |
| STORAGE | rangekeycount | Count of all range keys (e.g. MVCC range tombstones) | Keys | GAUGE | COUNT | AVG | NONE |
| STORAGE | ranges | Number of ranges | Ranges | GAUGE | COUNT | AVG | NONE |
+| STORAGE | ranges.decommissioning | Number of ranges with at lease one replica on a decommissioning node | Ranges | GAUGE | COUNT | AVG | NONE |
| STORAGE | ranges.overreplicated | Number of ranges with more live replicas than the replication target | Ranges | GAUGE | COUNT | AVG | NONE |
| STORAGE | ranges.unavailable | Number of ranges with fewer live replicas than needed for quorum | Ranges | GAUGE | COUNT | AVG | NONE |
| STORAGE | ranges.underreplicated | Number of ranges with fewer live replicas than the replication target | Ranges | GAUGE | COUNT | AVG | NONE |
diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go
index bf65bede0416..1ea9ec8939da 100644
--- a/pkg/kv/kvserver/metrics.go
+++ b/pkg/kv/kvserver/metrics.go
@@ -120,6 +120,12 @@ var (
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
+ metaDecommissioningRangeCount = metric.Metadata{
+ Name: "ranges.decommissioning",
+ Help: "Number of ranges with at lease one replica on a decommissioning node",
+ Measurement: "Ranges",
+ Unit: metric.Unit_COUNT,
+ }
// Lease request metrics.
metaLeaseRequestSuccessCount = metric.Metadata{
@@ -2343,6 +2349,7 @@ type StoreMetrics struct {
UnavailableRangeCount *metric.Gauge
UnderReplicatedRangeCount *metric.Gauge
OverReplicatedRangeCount *metric.Gauge
+ DecommissioningRangeCount *metric.Gauge
// Lease request metrics for successful and failed lease requests. These
// count proposals (i.e. it does not matter how many replicas apply the
@@ -3012,6 +3019,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
UnavailableRangeCount: metric.NewGauge(metaUnavailableRangeCount),
UnderReplicatedRangeCount: metric.NewGauge(metaUnderReplicatedRangeCount),
OverReplicatedRangeCount: metric.NewGauge(metaOverReplicatedRangeCount),
+ DecommissioningRangeCount: metric.NewGauge(metaDecommissioningRangeCount),
// Lease request metrics.
LeaseRequestSuccessCount: metric.NewCounter(metaLeaseRequestSuccessCount),
diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go
index 41b2d4869646..6775ae3b7d6e 100644
--- a/pkg/kv/kvserver/replica.go
+++ b/pkg/kv/kvserver/replica.go
@@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc"
@@ -887,6 +888,12 @@ type Replica struct {
// loadBasedSplitter keeps information about load-based splitting.
loadBasedSplitter split.Decider
+ // lastProblemRangeReplicateEnqueueTime is the last time this replica was
+ // eagerly enqueued into the replicate queue due to being underreplicated
+ // or having a decommissioning replica. This is used to throttle enqueue
+ // attempts.
+ lastProblemRangeReplicateEnqueueTime atomic.Value
+
// unreachablesMu contains a set of remote ReplicaIDs that are to be reported
// as unreachable on the next raft tick.
unreachablesMu struct {
@@ -2451,3 +2458,62 @@ func (r *Replica) ReadProtectedTimestampsForTesting(ctx context.Context) (err er
ts, err = r.readProtectedTimestampsRLocked(ctx)
return err
}
+
+// GetMutexForTesting returns the replica's mutex, for use in tests.
+func (r *Replica) GetMutexForTesting() *ReplicaMutex {
+ return &r.mu.ReplicaMutex
+}
+
+// maybeEnqueueProblemRange will enqueue the replica for processing into the
+// replicate queue iff:
+//
+// - The replica is the holder of a valid lease.
+// - EnqueueProblemRangeInReplicateQueueInterval is enabled (set to a
+// non-zero value)
+// - The last time the replica was enqueued is longer than
+// EnqueueProblemRangeInReplicateQueueInterval.
+//
+// The replica is enqueued at a decommissioning priority. Note that by default,
+// this behavior is disabled (zero interval). Also note that this method should
+// NOT be called unless the range is known to require action e.g.,
+// decommissioning|underreplicated.
+//
+// NOTE: This method is motivated by a bug where decommissioning stalls because
+// a decommissioning range is not enqueued in the replicate queue in a timely
+// manner via the replica scanner, see #130199. This functionality is disabled
+// by default for this reason.
+func (r *Replica) maybeEnqueueProblemRange(
+ ctx context.Context, now time.Time, leaseValid, isLeaseholder bool,
+) {
+ // The method expects the caller to provide whether the lease is valid and
+ // the replica is the leaseholder for the range, so that it can avoid
+ // unnecessary work. We expect this method to be called in the context of
+ // updating metrics.
+ if !isLeaseholder || !leaseValid {
+ // The replicate queue will not process the replica without a valid lease.
+ // Nothing to do.
+ return
+ }
+
+ interval := EnqueueProblemRangeInReplicateQueueInterval.Get(&r.store.cfg.Settings.SV)
+ if interval == 0 {
+ // The setting is disabled.
+ return
+ }
+ lastTime := r.lastProblemRangeReplicateEnqueueTime.Load().(time.Time)
+ if lastTime.Add(interval).After(now) {
+ // The last time the replica was enqueued is less than the interval ago,
+ // nothing to do.
+ return
+ }
+ // The replica is the leaseholder for a range which requires action and it
+ // has been longer than EnqueueProblemRangeInReplicateQueueInterval since the
+ // last time it was enqueued. Try to swap the last time with now. We don't
+ // expect a race, however if the value changed underneath us we won't enqueue
+ // the replica as we lost the race.
+ if !r.lastProblemRangeReplicateEnqueueTime.CompareAndSwap(lastTime, now) {
+ return
+ }
+ r.store.replicateQueue.AddAsync(ctx, r,
+ allocatorimpl.AllocatorReplaceDecommissioningVoter.Priority())
+}
diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go
index a18557c9a576..754c612881bf 100644
--- a/pkg/kv/kvserver/replica_init.go
+++ b/pkg/kv/kvserver/replica_init.go
@@ -167,6 +167,7 @@ func newUninitializedReplicaWithoutRaftGroup(
store.rebalanceObjManager.Objective().ToSplitObjective(),
)
}
+ r.lastProblemRangeReplicateEnqueueTime.Store(store.Clock().PhysicalTime())
// NB: the state will be loaded when the replica gets initialized.
r.mu.state = uninitState
diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go
index 95de87cb40c7..d5db3f8d465b 100644
--- a/pkg/kv/kvserver/replica_metrics.go
+++ b/pkg/kv/kvserver/replica_metrics.go
@@ -50,6 +50,7 @@ type ReplicaMetrics struct {
Unavailable bool
Underreplicated bool
Overreplicated bool
+ Decommissioning bool
RaftLogTooLarge bool
BehindCount int64
PausedFollowerCount int64
@@ -163,8 +164,9 @@ func calcReplicaMetrics(d calcReplicaMetricsInput) ReplicaMetrics {
}
}
- rangeCounter, unavailable, underreplicated, overreplicated := calcRangeCounter(
- d.storeID, d.desc, d.leaseStatus, d.vitalityMap, d.conf.GetNumVoters(), d.conf.NumReplicas, d.clusterNodes)
+ rangeCounter, unavailable, underreplicated, overreplicated, decommissioning := calcRangeCounter(
+ d.storeID, d.desc, d.leaseStatus, d.vitalityMap, d.conf.GetNumVoters(), d.conf.NumReplicas,
+ d.clusterNodes)
// The raft leader computes the number of raft entries that replicas are
// behind.
@@ -191,6 +193,7 @@ func calcReplicaMetrics(d calcReplicaMetricsInput) ReplicaMetrics {
Unavailable: unavailable,
Underreplicated: underreplicated,
Overreplicated: overreplicated,
+ Decommissioning: decommissioning,
RaftLogTooLarge: d.raftLogSizeTrusted &&
d.raftLogSize > raftLogTooLargeMultiple*d.raftCfg.RaftLogTruncationThreshold,
BehindCount: leaderBehindCount,
@@ -231,7 +234,7 @@ func calcRangeCounter(
vitalityMap livenesspb.NodeVitalityMap,
numVoters, numReplicas int32,
clusterNodes int,
-) (rangeCounter, unavailable, underreplicated, overreplicated bool) {
+) (rangeCounter, unavailable, underreplicated, overreplicated, decommissioning bool) {
// If there is a live leaseholder (regardless of whether the lease is still
// valid) that leaseholder is responsible for range-level metrics.
if vitalityMap[leaseStatus.Lease.Replica.NodeID].IsLive(livenesspb.Metrics) {
@@ -266,6 +269,7 @@ func calcRangeCounter(
} else if neededVoters < liveVoters || neededNonVoters < liveNonVoters {
overreplicated = true
}
+ decommissioning = calcDecommissioningCount(desc, vitalityMap) > 0
}
return
}
@@ -320,6 +324,18 @@ func calcBehindCount(
return behindCount
}
+func calcDecommissioningCount(
+ desc *roachpb.RangeDescriptor, vitalityMap livenesspb.NodeVitalityMap,
+) int {
+ var decommissioningCount int
+ for _, rd := range desc.Replicas().Descriptors() {
+ if vitalityMap[rd.NodeID].IsDecommissioning() {
+ decommissioningCount++
+ }
+ }
+ return decommissioningCount
+}
+
// LoadStats returns the load statistics for the replica.
func (r *Replica) LoadStats() load.ReplicaLoadStats {
return r.loadStats.Stats()
diff --git a/pkg/kv/kvserver/replica_metrics_test.go b/pkg/kv/kvserver/replica_metrics_test.go
index c6f1c0d0b409..46bc5a175791 100644
--- a/pkg/kv/kvserver/replica_metrics_test.go
+++ b/pkg/kv/kvserver/replica_metrics_test.go
@@ -55,7 +55,7 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
}))
{
- ctr, down, under, over := calcRangeCounter(1100, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
+ ctr, down, under, over, decom := calcRangeCounter(1100, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
1000: livenesspb.FakeNodeVitality(true), // by NodeID
}, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */)
@@ -63,10 +63,11 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.True(t, down)
require.True(t, under)
require.False(t, over)
+ require.False(t, decom)
}
{
- ctr, down, under, over := calcRangeCounter(1000, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
+ ctr, down, under, over, decom := calcRangeCounter(1000, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
1000: livenesspb.FakeNodeVitality(false),
}, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */)
@@ -76,10 +77,11 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.False(t, under)
require.False(t, over)
+ require.False(t, decom)
}
{
- ctr, down, under, over := calcRangeCounter(11, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
+ ctr, down, under, over, decom := calcRangeCounter(11, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{
10: livenesspb.FakeNodeVitality(true),
100: livenesspb.FakeNodeVitality(true),
1000: livenesspb.FakeNodeVitality(true),
@@ -90,11 +92,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.False(t, under)
require.False(t, over)
+ require.False(t, decom)
}
{
// Single non-voter dead
- ctr, down, under, over := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
+ ctr, down, under, over, decom := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
10: livenesspb.FakeNodeVitality(true),
100: livenesspb.FakeNodeVitality(true),
1000: livenesspb.FakeNodeVitality(false),
@@ -105,11 +108,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.True(t, under)
require.False(t, over)
+ require.False(t, decom)
}
{
// All non-voters are dead, but range is not unavailable
- ctr, down, under, over := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
+ ctr, down, under, over, decom := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
10: livenesspb.FakeNodeVitality(true),
100: livenesspb.FakeNodeVitality(false),
1000: livenesspb.FakeNodeVitality(false),
@@ -120,11 +124,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.True(t, under)
require.False(t, over)
+ require.False(t, decom)
}
{
// More non-voters than needed
- ctr, down, under, over := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
+ ctr, down, under, over, decom := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{
10: livenesspb.FakeNodeVitality(true),
100: livenesspb.FakeNodeVitality(true),
1000: livenesspb.FakeNodeVitality(true),
@@ -135,6 +140,21 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) {
require.False(t, down)
require.False(t, under)
require.True(t, over)
+ require.False(t, decom)
+ }
+
+ {
+ // Decommissioning node.
+ vitality := livenesspb.TestCreateNodeVitality(10, 100, 1000, 2000)
+ vitality.Decommissioning(100, true /* alive */)
+ ctr, down, under, over, decom := calcRangeCounter(11, threeVotersAndSingleNonVoter, leaseStatus, vitality.ScanNodeVitalityFromCache(),
+ 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */)
+
+ require.True(t, ctr)
+ require.False(t, down)
+ require.False(t, under)
+ require.False(t, over)
+ require.True(t, decom)
}
}
@@ -242,7 +262,7 @@ func TestCalcRangeCounterLeaseHolder(t *testing.T) {
for _, nodeID := range tc.liveNodes {
livenessMap[nodeID] = livenesspb.FakeNodeVitality(true)
}
- ctr, _, _, _ := calcRangeCounter(tc.storeID, rangeDesc, tc.leaseStatus, livenessMap,
+ ctr, _, _, _, _ := calcRangeCounter(tc.storeID, rangeDesc, tc.leaseStatus, livenessMap,
3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */)
require.Equal(t, tc.expectCounter, ctr)
})
diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go
index 3d8b49858757..139918cd587f 100644
--- a/pkg/kv/kvserver/replicate_queue.go
+++ b/pkg/kv/kvserver/replicate_queue.go
@@ -113,6 +113,22 @@ var EnqueueInReplicateQueueOnSpanConfigUpdateEnabled = settings.RegisterBoolSett
true,
)
+// EnqueueProblemRangeInReplicateQueueInterval controls the interval at which
+// problem ranges are enqueued into the replicate queue for processing, outside
+// of the normal scanner interval. A problem range is one which is
+// underreplicated or has a replica on a decommissioning store. The setting is
+// disabled when set to 0. By default, the setting is disabled.
+var EnqueueProblemRangeInReplicateQueueInterval = settings.RegisterDurationSetting(
+ settings.SystemOnly,
+ "kv.enqueue_in_replicate_queue_on_problem.interval",
+ "interval at which problem ranges are enqueued into the replicate queue for "+
+ "processing, outside of the normal scanner interval; a problem range is "+
+ "one which is underreplicated or has a replica on a decommissioning store, "+
+ "disabled when set to 0",
+ 0,
+ settings.NonNegativeDuration,
+)
+
var (
metaReplicateQueueAddReplicaCount = metric.Metadata{
Name: "queue.replicate.addreplica",
diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go
index 3cd2ea04cb5f..b5b0049f25ad 100644
--- a/pkg/kv/kvserver/replicate_queue_test.go
+++ b/pkg/kv/kvserver/replicate_queue_test.go
@@ -2629,3 +2629,70 @@ func TestReplicateQueueLeasePreferencePurgatoryError(t *testing.T) {
return checkLeaseCount(nextPreferredNode, numRanges)
})
}
+
+// TestReplicateQueueDecommissionScannerDisabled asserts that decommissioning
+// replicas are replaced by the replicate queue despite the scanner being
+// disabled, when EnqueueProblemRangeInReplicateQueueInterval is set to a
+// non-zero value (enabled).
+func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ // Enable enqueueing of problem ranges in the replicate queue at most once
+ // per second. We disable the scanner to ensure that the replicate queue
+ // doesn't rely on the scanner to process decommissioning replicas.
+ settings := cluster.MakeTestingClusterSettings()
+ kvserver.EnqueueProblemRangeInReplicateQueueInterval.Override(
+ context.Background(), &settings.SV, 1*time.Second)
+
+ ctx := context.Background()
+ tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{
+ ReplicationMode: base.ReplicationManual,
+ ServerArgs: base.TestServerArgs{
+ Settings: settings,
+ DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
+ // Disable the scanner.
+ ScanInterval: 100 * time.Hour,
+ ScanMinIdleTime: 100 * time.Hour,
+ ScanMaxIdleTime: 100 * time.Hour,
+ },
+ })
+ defer tc.Stopper().Stop(ctx)
+
+ decommissioningSrvIdx := 4
+ decommissioningSrv := tc.Server(decommissioningSrvIdx)
+ require.NoError(t, decommissioningSrv.Decommission(ctx,
+ livenesspb.MembershipStatus_DECOMMISSIONING,
+ []roachpb.NodeID{tc.Server(decommissioningSrvIdx).NodeID()}))
+
+ // Ensure that the node is marked as decommissioning on every other node.
+ // Once this is set, we also know that the onDecommissioning callback has
+ // fired, which enqueues every range which is on the decommissioning node.
+ testutils.SucceedsSoon(t, func() error {
+ for i := 0; i < tc.NumServers(); i++ {
+ srv := tc.Server(i)
+ if _, exists := srv.DecommissioningNodeMap()[decommissioningSrv.NodeID()]; !exists {
+ return errors.Newf("node %d not detected to be decommissioning", decommissioningSrv.NodeID())
+ }
+ }
+ return nil
+ })
+
+ // Now add a replica to the decommissioning node and then enable the
+ // replicate queue. We expect that the replica will be removed after the
+ // decommissioning replica is noticed via maybeEnqueueProblemRange.
+ scratchKey := tc.ScratchRange(t)
+ tc.AddVotersOrFatal(t, scratchKey, tc.Target(decommissioningSrvIdx))
+ tc.ToggleReplicateQueues(true /* active */)
+ testutils.SucceedsSoon(t, func() error {
+ var descs []*roachpb.RangeDescriptor
+ tc.GetFirstStoreFromServer(t, decommissioningSrvIdx).VisitReplicas(func(r *kvserver.Replica) bool {
+ descs = append(descs, r.Desc())
+ return true
+ })
+ if len(descs) != 0 {
+ return errors.Errorf("expected no replicas, found %d: %v", len(descs), descs)
+ }
+ return nil
+ })
+}
diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go
index 9ce695ae099b..7c304b0e6a57 100644
--- a/pkg/kv/kvserver/store.go
+++ b/pkg/kv/kvserver/store.go
@@ -3046,6 +3046,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
unavailableRangeCount int64
underreplicatedRangeCount int64
overreplicatedRangeCount int64
+ decommissioningRangeCount int64
behindCount int64
pausedFollowerCount int64
ioOverload float64
@@ -3065,6 +3066,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
)
now := s.cfg.Clock.NowAsClockTimestamp()
+ goNow := now.ToTimestamp().GoTime()
clusterNodes := s.ClusterNodeCount()
s.mu.RLock()
@@ -3126,6 +3128,12 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
if metrics.Overreplicated {
overreplicatedRangeCount++
}
+ if metrics.Decommissioning {
+ // NB: Enqueue is disabled by default from here and throttled async if
+ // enabled.
+ rep.maybeEnqueueProblemRange(ctx, goNow, metrics.LeaseValid, metrics.Leaseholder)
+ decommissioningRangeCount++
+ }
}
pausedFollowerCount += metrics.PausedFollowerCount
pendingRaftProposalCount += metrics.PendingRaftProposalCount
@@ -3188,6 +3196,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
s.metrics.UnavailableRangeCount.Update(unavailableRangeCount)
s.metrics.UnderReplicatedRangeCount.Update(underreplicatedRangeCount)
s.metrics.OverReplicatedRangeCount.Update(overreplicatedRangeCount)
+ s.metrics.DecommissioningRangeCount.Update(decommissioningRangeCount)
s.metrics.RaftLogFollowerBehindCount.Update(behindCount)
s.metrics.RaftPausedFollowerCount.Update(pausedFollowerCount)
s.metrics.IOOverload.Update(ioOverload)