Skip to content

Commit 5945d37

Browse files
committed
kvserver: enqueue decom ranges at an interval behind a setting
When `kv.enqueue_in_replicate_queue_on_problem.interval` is set to a positive non-zero value, leaseholder replicas of ranges which have decommissioning replicas will be enqueued into the replicate queue every `kv.enqueue_in_replicate_queue_on_problem.interval` interval. When `kv.enqueue_in_replicate_queue_on_problem.interval` is set to 0, no enqueueing on decommissioning will take place, outside of the regular replica scanner. A recommended value for users enabling the enqueue (non-zero), is at least 15 minutes e.g., ``` SET CLUSTER SETTING kv.enqueue_in_replicate_queue_on_problem.interval='900s' ``` Resolves: #130085 Informs: #130199 Release note: None
1 parent b95e1a5 commit 5945d37

File tree

5 files changed

+147
-0
lines changed

5 files changed

+147
-0
lines changed

pkg/kv/kvserver/replica.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/kv"
2323
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2424
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
25+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
2526
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
2627
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
2728
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc"
@@ -844,6 +845,12 @@ type Replica struct {
844845
// loadBasedSplitter keeps information about load-based splitting.
845846
loadBasedSplitter split.Decider
846847

848+
// lastProblemRangeReplicateEnqueueTime is the last time this replica was
849+
// eagerly enqueued into the replicate queue due to being underreplicated
850+
// or having a decommissioning replica. This is used to throttle enqueue
851+
// attempts.
852+
lastProblemRangeReplicateEnqueueTime atomic.Value
853+
847854
// unreachablesMu contains a set of remote ReplicaIDs that are to be reported
848855
// as unreachable on the next raft tick.
849856
unreachablesMu struct {
@@ -2356,3 +2363,57 @@ func (r *Replica) ReadProtectedTimestampsForTesting(ctx context.Context) (err er
23562363
ts, err = r.readProtectedTimestampsRLocked(ctx)
23572364
return err
23582365
}
2366+
2367+
// maybeEnqueueProblemRange will enqueue the replica for processing into the
2368+
// replicate queue iff:
2369+
//
2370+
// - The replica is the holder of a valid lease.
2371+
// - EnqueueProblemRangeInReplicateQueueInterval is enabled (set to a
2372+
// non-zero value)
2373+
// - The last time the replica was enqueued is longer than
2374+
// EnqueueProblemRangeInReplicateQueueInterval.
2375+
//
2376+
// The replica is enqueued at a decommissioning priority. Note that by default,
2377+
// this behavior is disabled (zero interval). Also note that this method should
2378+
// NOT be called unless the range is known to require action e.g.,
2379+
// decommissioning|underreplicated.
2380+
//
2381+
// NOTE: This method is motivated by a bug where decommissioning stalls because
2382+
// a decommissioning range is not enqueued in the replicate queue in a timely
2383+
// manner via the replica scanner, see #130199. This functionality is disabled
2384+
// by default for this reason.
2385+
func (r *Replica) maybeEnqueueProblemRange(
2386+
ctx context.Context, now time.Time, leaseValid, isLeaseholder bool,
2387+
) {
2388+
// The method expects the caller to provide whether the lease is valid and
2389+
// the replica is the leaseholder for the range, so that it can avoid
2390+
// unnecessary work. We expect this method to be called in the context of
2391+
// updating metrics.
2392+
if !isLeaseholder || !leaseValid {
2393+
// The replicate queue will not process the replica without a valid lease.
2394+
// Nothing to do.
2395+
return
2396+
}
2397+
2398+
interval := EnqueueProblemRangeInReplicateQueueInterval.Get(&r.store.cfg.Settings.SV)
2399+
if interval == 0 {
2400+
// The setting is disabled.
2401+
return
2402+
}
2403+
lastTime := r.lastProblemRangeReplicateEnqueueTime.Load().(time.Time)
2404+
if lastTime.Add(interval).After(now) {
2405+
// The last time the replica was enqueued is less than the interval ago,
2406+
// nothing to do.
2407+
return
2408+
}
2409+
// The replica is the leaseholder for a range which requires action and it
2410+
// has been longer than EnqueueProblemRangeInReplicateQueueInterval since the
2411+
// last time it was enqueued. Try to swap the last time with now. We don't
2412+
// expect a race, however if the value changed underneath us we won't enqueue
2413+
// the replica as we lost the race.
2414+
if !r.lastProblemRangeReplicateEnqueueTime.CompareAndSwap(lastTime, now) {
2415+
return
2416+
}
2417+
r.store.replicateQueue.AddAsync(ctx, r,
2418+
allocatorimpl.AllocatorReplaceDecommissioningVoter.Priority())
2419+
}

pkg/kv/kvserver/replica_init.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func newUninitializedReplica(
128128
store.rebalanceObjManager.Objective().ToSplitObjective(),
129129
)
130130
}
131+
r.lastProblemRangeReplicateEnqueueTime.Store(store.Clock().PhysicalTime())
131132

132133
// NB: the state will be loaded when the replica gets initialized.
133134
r.mu.state = uninitState

pkg/kv/kvserver/replicate_queue.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,22 @@ var EnqueueInReplicateQueueOnSpanConfigUpdateEnabled = settings.RegisterBoolSett
9191
false,
9292
)
9393

94+
// EnqueueProblemRangeInReplicateQueueInterval controls the interval at which
95+
// problem ranges are enqueued into the replicate queue for processing, outside
96+
// of the normal scanner interval. A problem range is one which is
97+
// underreplicated or has a replica on a decommissioning store. The setting is
98+
// disabled when set to 0. By default, the setting is disabled.
99+
var EnqueueProblemRangeInReplicateQueueInterval = settings.RegisterDurationSetting(
100+
settings.SystemOnly,
101+
"kv.enqueue_in_replicate_queue_on_problem.interval",
102+
"interval at which problem ranges are enqueued into the replicate queue for "+
103+
"processing, outside of the normal scanner interval; a problem range is "+
104+
"one which is underreplicated or has a replica on a decommissioning store, "+
105+
"disabled when set to 0",
106+
0,
107+
settings.NonNegativeDuration,
108+
)
109+
94110
var (
95111
metaReplicateQueueAddReplicaCount = metric.Metadata{
96112
Name: "queue.replicate.addreplica",

pkg/kv/kvserver/replicate_queue_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2778,3 +2778,69 @@ func TestReplicationChangesForRebalance(t *testing.T) {
27782778
})
27792779
}
27802780
}
2781+
2782+
// TestReplicateQueueDecommissionScannerDisabled asserts that decommissioning
2783+
// replicas are replaced by the replicate queue despite the scanner being
2784+
// disabled, when EnqueueProblemRangeInReplicateQueueInterval is set to a
2785+
// non-zero value (enabled).
2786+
func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) {
2787+
defer leaktest.AfterTest(t)()
2788+
defer log.Scope(t).Close(t)
2789+
2790+
// Enable enqueueing of problem ranges in the replicate queue at most once
2791+
// per second. We disable the scanner to ensure that the replicate queue
2792+
// doesn't rely on the scanner to process decommissioning replicas.
2793+
settings := cluster.MakeTestingClusterSettings()
2794+
kvserver.EnqueueProblemRangeInReplicateQueueInterval.Override(
2795+
context.Background(), &settings.SV, 1*time.Second)
2796+
2797+
ctx := context.Background()
2798+
tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{
2799+
ReplicationMode: base.ReplicationManual,
2800+
ServerArgs: base.TestServerArgs{
2801+
Settings: settings,
2802+
// Disable the scanner.
2803+
ScanInterval: 100 * time.Hour,
2804+
ScanMinIdleTime: 100 * time.Hour,
2805+
ScanMaxIdleTime: 100 * time.Hour,
2806+
},
2807+
})
2808+
defer tc.Stopper().Stop(ctx)
2809+
2810+
decommissioningSrvIdx := 4
2811+
decommissioningSrv := tc.Server(decommissioningSrvIdx)
2812+
require.NoError(t, decommissioningSrv.Decommission(ctx,
2813+
livenesspb.MembershipStatus_DECOMMISSIONING,
2814+
[]roachpb.NodeID{tc.Server(decommissioningSrvIdx).NodeID()}))
2815+
2816+
// Ensure that the node is marked as decommissioning on every other node.
2817+
// Once this is set, we also know that the onDecommissioning callback has
2818+
// fired, which enqueues every range which is on the decommissioning node.
2819+
testutils.SucceedsSoon(t, func() error {
2820+
for i := 0; i < tc.NumServers(); i++ {
2821+
srv := tc.Server(i)
2822+
if _, exists := srv.DecommissioningNodeMap()[decommissioningSrv.NodeID()]; !exists {
2823+
return errors.Newf("node %d not detected to be decommissioning", decommissioningSrv.NodeID())
2824+
}
2825+
}
2826+
return nil
2827+
})
2828+
2829+
// Now add a replica to the decommissioning node and then enable the
2830+
// replicate queue. We expect that the replica will be removed after the
2831+
// decommissioning replica is noticed via maybeEnqueueProblemRange.
2832+
scratchKey := tc.ScratchRange(t)
2833+
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decommissioningSrvIdx))
2834+
tc.ToggleReplicateQueues(true /* active */)
2835+
testutils.SucceedsSoon(t, func() error {
2836+
var descs []*roachpb.RangeDescriptor
2837+
tc.GetFirstStoreFromServer(t, decommissioningSrvIdx).VisitReplicas(func(r *kvserver.Replica) bool {
2838+
descs = append(descs, r.Desc())
2839+
return true
2840+
})
2841+
if len(descs) != 0 {
2842+
return errors.Errorf("expected no replicas, found %d: %v", len(descs), descs)
2843+
}
2844+
return nil
2845+
})
2846+
}

pkg/kv/kvserver/store.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3069,6 +3069,9 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
30693069
overreplicatedRangeCount++
30703070
}
30713071
if metrics.Decommissioning {
3072+
// NB: Enqueue is disabled by default from here and throttled async if
3073+
// enabled.
3074+
rep.maybeEnqueueProblemRange(ctx, now.ToTimestamp().GoTime(), metrics.LeaseValid, metrics.Leaseholder)
30723075
decommissioningRangeCount++
30733076
}
30743077
}

0 commit comments

Comments
 (0)