diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 40c894892048..ee2740896c70 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -533,6 +533,7 @@ STORAGErangekeybytesNumber of bytes taken up by range keys (e.g. MVCC range tombstones)StorageGAUGEBYTESAVGNONE STORAGErangekeycountCount of all range keys (e.g. MVCC range tombstones)KeysGAUGECOUNTAVGNONE STORAGErangesNumber of rangesRangesGAUGECOUNTAVGNONE +STORAGEranges.decommissioningNumber of ranges with at lease one replica on a decommissioning nodeRangesGAUGECOUNTAVGNONE STORAGEranges.overreplicatedNumber of ranges with more live replicas than the replication targetRangesGAUGECOUNTAVGNONE STORAGEranges.unavailableNumber of ranges with fewer live replicas than needed for quorumRangesGAUGECOUNTAVGNONE STORAGEranges.underreplicatedNumber of ranges with fewer live replicas than the replication targetRangesGAUGECOUNTAVGNONE diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index ad7d95630cd7..428686cd01b9 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -120,6 +120,12 @@ var ( Measurement: "Ranges", Unit: metric.Unit_COUNT, } + metaDecommissioningRangeCount = metric.Metadata{ + Name: "ranges.decommissioning", + Help: "Number of ranges with at lease one replica on a decommissioning node", + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + } // Lease request metrics. metaLeaseRequestSuccessCount = metric.Metadata{ @@ -2566,6 +2572,7 @@ type StoreMetrics struct { UnavailableRangeCount *metric.Gauge UnderReplicatedRangeCount *metric.Gauge OverReplicatedRangeCount *metric.Gauge + DecommissioningRangeCount *metric.Gauge // Lease request metrics for successful and failed lease requests. These // count proposals (i.e. it does not matter how many replicas apply the @@ -3263,6 +3270,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { UnavailableRangeCount: metric.NewGauge(metaUnavailableRangeCount), UnderReplicatedRangeCount: metric.NewGauge(metaUnderReplicatedRangeCount), OverReplicatedRangeCount: metric.NewGauge(metaOverReplicatedRangeCount), + DecommissioningRangeCount: metric.NewGauge(metaDecommissioningRangeCount), // Lease request metrics. LeaseRequestSuccessCount: metric.NewCounter(metaLeaseRequestSuccessCount), diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 1bb4a70939cc..13767ce4249f 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" @@ -927,6 +928,12 @@ type Replica struct { // changes for a range on the leaseholder. allocatorToken *plan.AllocatorToken + // lastProblemRangeReplicateEnqueueTime is the last time this replica was + // eagerly enqueued into the replicate queue due to being underreplicated + // or having a decommissioning replica. This is used to throttle enqueue + // attempts. + lastProblemRangeReplicateEnqueueTime atomic.Value + // unreachablesMu contains a set of remote ReplicaIDs that are to be reported // as unreachable on the next raft tick. unreachablesMu struct { @@ -2557,3 +2564,57 @@ func racV2EnabledWhenLeaderLevel( // TODO(sumeer): implement fully, once all the dependencies are implemented. return replica_rac2.NotEnabledWhenLeader } + +// maybeEnqueueProblemRange will enqueue the replica for processing into the +// replicate queue iff: +// +// - The replica is the holder of a valid lease. +// - EnqueueProblemRangeInReplicateQueueInterval is enabled (set to a +// non-zero value) +// - The last time the replica was enqueued is longer than +// EnqueueProblemRangeInReplicateQueueInterval. +// +// The replica is enqueued at a decommissioning priority. Note that by default, +// this behavior is disabled (zero interval). Also note that this method should +// NOT be called unless the range is known to require action e.g., +// decommissioning|underreplicated. +// +// NOTE: This method is motivated by a bug where decommissioning stalls because +// a decommissioning range is not enqueued in the replicate queue in a timely +// manner via the replica scanner, see #130199. This functionality is disabled +// by default for this reason. +func (r *Replica) maybeEnqueueProblemRange( + ctx context.Context, now time.Time, leaseValid, isLeaseholder bool, +) { + // The method expects the caller to provide whether the lease is valid and + // the replica is the leaseholder for the range, so that it can avoid + // unnecessary work. We expect this method to be called in the context of + // updating metrics. + if !isLeaseholder || !leaseValid { + // The replicate queue will not process the replica without a valid lease. + // Nothing to do. + return + } + + interval := EnqueueProblemRangeInReplicateQueueInterval.Get(&r.store.cfg.Settings.SV) + if interval == 0 { + // The setting is disabled. + return + } + lastTime := r.lastProblemRangeReplicateEnqueueTime.Load().(time.Time) + if lastTime.Add(interval).After(now) { + // The last time the replica was enqueued is less than the interval ago, + // nothing to do. + return + } + // The replica is the leaseholder for a range which requires action and it + // has been longer than EnqueueProblemRangeInReplicateQueueInterval since the + // last time it was enqueued. Try to swap the last time with now. We don't + // expect a race, however if the value changed underneath us we won't enqueue + // the replica as we lost the race. + if !r.lastProblemRangeReplicateEnqueueTime.CompareAndSwap(lastTime, now) { + return + } + r.store.replicateQueue.AddAsync(ctx, r, + allocatorimpl.AllocatorReplaceDecommissioningVoter.Priority()) +} diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 59fb42109d9f..dbd899faa439 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -184,6 +184,7 @@ func newUninitializedReplicaWithoutRaftGroup( store.rebalanceObjManager.Objective().ToSplitObjective(), ) } + r.lastProblemRangeReplicateEnqueueTime.Store(store.Clock().PhysicalTime()) // NB: the state will be loaded when the replica gets initialized. r.mu.state = uninitState diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go index b6aac79f1a7e..818c62b34bf5 100644 --- a/pkg/kv/kvserver/replica_metrics.go +++ b/pkg/kv/kvserver/replica_metrics.go @@ -51,6 +51,7 @@ type ReplicaMetrics struct { Unavailable bool Underreplicated bool Overreplicated bool + Decommissioning bool RaftLogTooLarge bool RangeTooLarge bool BehindCount int64 @@ -172,7 +173,7 @@ func calcReplicaMetrics(d calcReplicaMetricsInput) ReplicaMetrics { rangeTooLargeMultiple = 2 ) largeRangeThreshold := rangeTooLargeMultiple * d.conf.RangeMaxBytes - rangeCounter, unavailable, underreplicated, overreplicated, tooLarge := calcRangeCounter( + rangeCounter, unavailable, underreplicated, overreplicated, tooLarge, decommissioning := calcRangeCounter( d.storeID, d.desc, d.leaseStatus, d.vitalityMap, d.conf.GetNumVoters(), d.conf.NumReplicas, d.clusterNodes, largeRangeThreshold, d.rangeSize) @@ -200,6 +201,7 @@ func calcReplicaMetrics(d calcReplicaMetricsInput) ReplicaMetrics { Unavailable: unavailable, Underreplicated: underreplicated, Overreplicated: overreplicated, + Decommissioning: decommissioning, RaftLogTooLarge: d.raftLogSizeTrusted && d.raftLogSize > raftLogTooLargeMultiple*d.raftCfg.RaftLogTruncationThreshold, RangeTooLarge: tooLarge, @@ -243,7 +245,7 @@ func calcRangeCounter( numVoters, numReplicas int32, clusterNodes int, rangeTooLargeThreshold, rangeSize int64, -) (rangeCounter, unavailable, underreplicated, overreplicated, tooLarge bool) { +) (rangeCounter, unavailable, underreplicated, overreplicated, tooLarge, decommissioning bool) { // If there is a live leaseholder (regardless of whether the lease is still // valid) that leaseholder is responsible for range-level metrics. if vitalityMap[leaseStatus.Lease.Replica.NodeID].IsLive(livenesspb.Metrics) { @@ -279,6 +281,7 @@ func calcRangeCounter( overreplicated = true } tooLarge = rangeSize > rangeTooLargeThreshold + decommissioning = calcDecommissioningCount(desc, vitalityMap) > 0 } return } @@ -333,6 +336,18 @@ func calcBehindCount( return behindCount } +func calcDecommissioningCount( + desc *roachpb.RangeDescriptor, vitalityMap livenesspb.NodeVitalityMap, +) int { + var decommissioningCount int + for _, rd := range desc.Replicas().Descriptors() { + if vitalityMap[rd.NodeID].IsDecommissioning() { + decommissioningCount++ + } + } + return decommissioningCount +} + // LoadStats returns the load statistics for the replica. func (r *Replica) LoadStats() load.ReplicaLoadStats { return r.loadStats.Stats() diff --git a/pkg/kv/kvserver/replica_metrics_test.go b/pkg/kv/kvserver/replica_metrics_test.go index a5182f4ac1c6..58a58b265e4a 100644 --- a/pkg/kv/kvserver/replica_metrics_test.go +++ b/pkg/kv/kvserver/replica_metrics_test.go @@ -55,7 +55,7 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) { })) { - ctr, down, under, over, _ := calcRangeCounter(1100, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{ + ctr, down, under, over, _, decom := calcRangeCounter(1100, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{ 1000: livenesspb.FakeNodeVitality(true), // by NodeID }, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */, 0 /* rangeTooLargeThreshold */, 0 /* rangeSize */) @@ -63,10 +63,11 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) { require.True(t, down) require.True(t, under) require.False(t, over) + require.False(t, decom) } { - ctr, down, under, over, _ := calcRangeCounter(1000, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{ + ctr, down, under, over, _, decom := calcRangeCounter(1000, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{ 1000: livenesspb.FakeNodeVitality(false), }, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */, 0 /* rangeTooLargeThreshold */, 0 /* rangeSize */) @@ -76,10 +77,11 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) { require.False(t, down) require.False(t, under) require.False(t, over) + require.False(t, decom) } { - ctr, down, under, over, _ := calcRangeCounter(11, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{ + ctr, down, under, over, _, decom := calcRangeCounter(11, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{ 10: livenesspb.FakeNodeVitality(true), 100: livenesspb.FakeNodeVitality(true), 1000: livenesspb.FakeNodeVitality(true), @@ -90,11 +92,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) { require.False(t, down) require.False(t, under) require.False(t, over) + require.False(t, decom) } { // Single non-voter dead - ctr, down, under, over, _ := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{ + ctr, down, under, over, _, decom := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{ 10: livenesspb.FakeNodeVitality(true), 100: livenesspb.FakeNodeVitality(true), 1000: livenesspb.FakeNodeVitality(false), @@ -105,11 +108,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) { require.False(t, down) require.True(t, under) require.False(t, over) + require.False(t, decom) } { // All non-voters are dead, but range is not unavailable - ctr, down, under, over, _ := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{ + ctr, down, under, over, _, decom := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{ 10: livenesspb.FakeNodeVitality(true), 100: livenesspb.FakeNodeVitality(false), 1000: livenesspb.FakeNodeVitality(false), @@ -120,11 +124,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) { require.False(t, down) require.True(t, under) require.False(t, over) + require.False(t, decom) } { // More non-voters than needed - ctr, down, under, over, _ := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{ + ctr, down, under, over, _, decom := calcRangeCounter(11, oneVoterAndThreeNonVoters, leaseStatus, livenesspb.NodeVitalityMap{ 10: livenesspb.FakeNodeVitality(true), 100: livenesspb.FakeNodeVitality(true), 1000: livenesspb.FakeNodeVitality(true), @@ -135,11 +140,12 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) { require.False(t, down) require.False(t, under) require.True(t, over) + require.False(t, decom) } { // Range larger than the threshold. - ctr, _, _, _, large := calcRangeCounter(1100, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{ + ctr, _, _, _, large, _ := calcRangeCounter(1100, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{ 1000: livenesspb.FakeNodeVitality(true), // by NodeID }, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */, 1000 /* rangeTooLargeThreshold */, 2000 /* rangeSize */) @@ -148,7 +154,7 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) { } { - ctr, _, _, _, large := calcRangeCounter(1000, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{ + ctr, _, _, _, large, _ := calcRangeCounter(1000, threeVotersAndSingleNonVoter, leaseStatus, livenesspb.NodeVitalityMap{ 1000: livenesspb.FakeNodeVitality(false), }, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */, 1000 /* rangeTooLargeThreshold */, 2000 /* rangeSize */) require.False(t, ctr) @@ -156,6 +162,20 @@ func TestCalcRangeCounterIsLiveMap(t *testing.T) { // large. require.False(t, large) } + + { + // Decommissioning node. + vitality := livenesspb.TestCreateNodeVitality(10, 100, 1000, 2000) + vitality.Decommissioning(100, true /* alive */) + ctr, down, under, over, _, decom := calcRangeCounter(11, threeVotersAndSingleNonVoter, leaseStatus, vitality.ScanNodeVitalityFromCache(), + 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */, 0 /* rangeTooLargeThreshold */, 0 /* rangeSize */) + + require.True(t, ctr) + require.False(t, down) + require.False(t, under) + require.False(t, over) + require.True(t, decom) + } } func TestCalcRangeCounterLeaseHolder(t *testing.T) { @@ -262,7 +282,7 @@ func TestCalcRangeCounterLeaseHolder(t *testing.T) { for _, nodeID := range tc.liveNodes { livenessMap[nodeID] = livenesspb.FakeNodeVitality(true) } - ctr, _, _, _, _ := calcRangeCounter(tc.storeID, rangeDesc, tc.leaseStatus, livenessMap, + ctr, _, _, _, _, _ := calcRangeCounter(tc.storeID, rangeDesc, tc.leaseStatus, livenessMap, 3 /* numVoters */, 4 /* numReplicas */, 4 /* clusterNodes */, 0 /* rangeTooLargeThreshold */, 0 /* rangeSize */) require.Equal(t, tc.expectCounter, ctr) }) diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 7a99316c040d..b0e255194425 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -89,6 +89,22 @@ var EnqueueInReplicateQueueOnSpanConfigUpdateEnabled = settings.RegisterBoolSett true, ) +// EnqueueProblemRangeInReplicateQueueInterval controls the interval at which +// problem ranges are enqueued into the replicate queue for processing, outside +// of the normal scanner interval. A problem range is one which is +// underreplicated or has a replica on a decommissioning store. The setting is +// disabled when set to 0. By default, the setting is disabled. +var EnqueueProblemRangeInReplicateQueueInterval = settings.RegisterDurationSetting( + settings.SystemOnly, + "kv.enqueue_in_replicate_queue_on_problem.interval", + "interval at which problem ranges are enqueued into the replicate queue for "+ + "processing, outside of the normal scanner interval; a problem range is "+ + "one which is underreplicated or has a replica on a decommissioning store, "+ + "disabled when set to 0", + 0, + settings.NonNegativeDuration, +) + var ( metaReplicateQueueAddReplicaCount = metric.Metadata{ Name: "queue.replicate.addreplica", diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 4884de5b55da..41c8aa263996 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -2436,3 +2436,70 @@ func TestReplicateQueueAllocatorToken(t *testing.T) { var allocationError allocator.AllocationError require.ErrorAs(t, processErr, &allocationError) } + +// TestReplicateQueueDecommissionScannerDisabled asserts that decommissioning +// replicas are replaced by the replicate queue despite the scanner being +// disabled, when EnqueueProblemRangeInReplicateQueueInterval is set to a +// non-zero value (enabled). +func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Enable enqueueing of problem ranges in the replicate queue at most once + // per second. We disable the scanner to ensure that the replicate queue + // doesn't rely on the scanner to process decommissioning replicas. + settings := cluster.MakeTestingClusterSettings() + kvserver.EnqueueProblemRangeInReplicateQueueInterval.Override( + context.Background(), &settings.SV, 1*time.Second) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: settings, + DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant, + // Disable the scanner. + ScanInterval: 100 * time.Hour, + ScanMinIdleTime: 100 * time.Hour, + ScanMaxIdleTime: 100 * time.Hour, + }, + }) + defer tc.Stopper().Stop(ctx) + + decommissioningSrvIdx := 4 + decommissioningSrv := tc.Server(decommissioningSrvIdx) + require.NoError(t, decommissioningSrv.Decommission(ctx, + livenesspb.MembershipStatus_DECOMMISSIONING, + []roachpb.NodeID{tc.Server(decommissioningSrvIdx).NodeID()})) + + // Ensure that the node is marked as decommissioning on every other node. + // Once this is set, we also know that the onDecommissioning callback has + // fired, which enqueues every range which is on the decommissioning node. + testutils.SucceedsSoon(t, func() error { + for i := 0; i < tc.NumServers(); i++ { + srv := tc.Server(i) + if _, exists := srv.DecommissioningNodeMap()[decommissioningSrv.NodeID()]; !exists { + return errors.Newf("node %d not detected to be decommissioning", decommissioningSrv.NodeID()) + } + } + return nil + }) + + // Now add a replica to the decommissioning node and then enable the + // replicate queue. We expect that the replica will be removed after the + // decommissioning replica is noticed via maybeEnqueueProblemRange. + scratchKey := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decommissioningSrvIdx)) + tc.ToggleReplicateQueues(true /* active */) + testutils.SucceedsSoon(t, func() error { + var descs []*roachpb.RangeDescriptor + tc.GetFirstStoreFromServer(t, decommissioningSrvIdx).VisitReplicas(func(r *kvserver.Replica) bool { + descs = append(descs, r.Desc()) + return true + }) + if len(descs) != 0 { + return errors.Errorf("expected no replicas, found %d: %v", len(descs), descs) + } + return nil + }) +} diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index cd776dae0171..f0333685594f 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3218,6 +3218,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { unavailableRangeCount int64 underreplicatedRangeCount int64 overreplicatedRangeCount int64 + decommissioningRangeCount int64 behindCount int64 pausedFollowerCount int64 ioOverload float64 @@ -3237,6 +3238,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { ) now := s.cfg.Clock.NowAsClockTimestamp() + goNow := now.ToTimestamp().GoTime() clusterNodes := s.ClusterNodeCount() s.mu.RLock() @@ -3300,6 +3302,12 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { if metrics.Overreplicated { overreplicatedRangeCount++ } + if metrics.Decommissioning { + // NB: Enqueue is disabled by default from here and throttled async if + // enabled. + rep.maybeEnqueueProblemRange(ctx, goNow, metrics.LeaseValid, metrics.Leaseholder) + decommissioningRangeCount++ + } } pausedFollowerCount += metrics.PausedFollowerCount pendingRaftProposalCount += metrics.PendingRaftProposalCount @@ -3363,6 +3371,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { s.metrics.UnavailableRangeCount.Update(unavailableRangeCount) s.metrics.UnderReplicatedRangeCount.Update(underreplicatedRangeCount) s.metrics.OverReplicatedRangeCount.Update(overreplicatedRangeCount) + s.metrics.DecommissioningRangeCount.Update(decommissioningRangeCount) s.metrics.RaftLogFollowerBehindCount.Update(behindCount) s.metrics.RaftPausedFollowerCount.Update(pausedFollowerCount) s.metrics.IOOverload.Update(ioOverload)