Skip to content

Commit e2f8dc2

Browse files
committed
server: react to decommissioning nodes by proactively enqueuing their replicas
Note: This patch implements a subset of cockroachdb#80836 Previously, when a node was marked `DECOMMISSIONING`, other nodes in the system would learn about it via gossip but wouldn't do much in the way of reacting to it. They'd rely on their `replicaScanner` to gradually run into the decommissioning node's ranges and rely on their `replicateQueue` to then rebalance them. This meant that even when decommissioning a mostly empty node, our worst case lower bound for marking that node fully decommissioned was _one full scanner interval_ (which is 10 minutes by default). This patch improves this behavior by installing an idempotent callback that is invoked every time a node is detected to be `DECOMMISSIONING`. When it is run, the callback enqueues all the replicas on the local stores that are on ranges that also have replicas on the decommissioning node. Release note: None
1 parent 8a62c9d commit e2f8dc2

File tree

7 files changed

+216
-22
lines changed

7 files changed

+216
-22
lines changed

pkg/kv/kvserver/liveness/liveness.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,11 @@ type NodeLiveness struct {
193193
// heartbeatPaused contains an atomically-swapped number representing a bool
194194
// (1 or 0). heartbeatToken is a channel containing a token which is taken
195195
// when heartbeating or when pausing the heartbeat. Used for testing.
196-
heartbeatPaused uint32
197-
heartbeatToken chan struct{}
198-
metrics Metrics
199-
onNodeDecommissioned func(livenesspb.Liveness) // noop if nil
196+
heartbeatPaused uint32
197+
heartbeatToken chan struct{}
198+
metrics Metrics
199+
onNodeDecommissioned func(livenesspb.Liveness) // noop if nil
200+
onNodeDecommissioning OnNodeDecommissionCallback // noop if nil
200201

201202
mu struct {
202203
syncutil.RWMutex
@@ -274,23 +275,27 @@ type NodeLivenessOptions struct {
274275
// idempotent as it may be invoked multiple times and defaults to a
275276
// noop.
276277
OnNodeDecommissioned func(livenesspb.Liveness)
278+
// OnNodeDecommissioning is invoked when a node is detected to be
279+
// decommissioning.
280+
OnNodeDecommissioning OnNodeDecommissionCallback
277281
}
278282

279283
// NewNodeLiveness returns a new instance of NodeLiveness configured
280284
// with the specified gossip instance.
281285
func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness {
282286
nl := &NodeLiveness{
283-
ambientCtx: opts.AmbientCtx,
284-
clock: opts.Clock,
285-
db: opts.DB,
286-
gossip: opts.Gossip,
287-
livenessThreshold: opts.LivenessThreshold,
288-
renewalDuration: opts.RenewalDuration,
289-
selfSem: make(chan struct{}, 1),
290-
st: opts.Settings,
291-
otherSem: make(chan struct{}, 1),
292-
heartbeatToken: make(chan struct{}, 1),
293-
onNodeDecommissioned: opts.OnNodeDecommissioned,
287+
ambientCtx: opts.AmbientCtx,
288+
clock: opts.Clock,
289+
db: opts.DB,
290+
gossip: opts.Gossip,
291+
livenessThreshold: opts.LivenessThreshold,
292+
renewalDuration: opts.RenewalDuration,
293+
selfSem: make(chan struct{}, 1),
294+
st: opts.Settings,
295+
otherSem: make(chan struct{}, 1),
296+
heartbeatToken: make(chan struct{}, 1),
297+
onNodeDecommissioned: opts.OnNodeDecommissioned,
298+
onNodeDecommissioning: opts.OnNodeDecommissioning,
294299
}
295300
nl.metrics = Metrics{
296301
LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes),
@@ -690,6 +695,10 @@ func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool {
690695
!liveness.Draining
691696
}
692697

698+
// OnNodeDecommissionCallback is a callback that is invoked when a node is
699+
// detected to be decommissioning.
700+
type OnNodeDecommissionCallback func(nodeID roachpb.NodeID)
701+
693702
// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`.
694703
type NodeLivenessStartOptions struct {
695704
Stopper *stop.Stopper
@@ -1397,6 +1406,9 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record)
13971406
if newLivenessRec.Membership.Decommissioned() && nl.onNodeDecommissioned != nil {
13981407
nl.onNodeDecommissioned(newLivenessRec.Liveness)
13991408
}
1409+
if newLivenessRec.Membership.Decommissioning() && nl.onNodeDecommissioning != nil {
1410+
nl.onNodeDecommissioning(newLivenessRec.NodeID)
1411+
}
14001412
}
14011413

14021414
// shouldReplaceLiveness checks to see if the new liveness is in fact newer

pkg/kv/kvserver/store.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3385,6 +3385,10 @@ func (s *Store) Enqueue(
33853385
) (recording tracing.Recording, processError error, enqueueError error) {
33863386
ctx = repl.AnnotateCtx(ctx)
33873387

3388+
if fn := s.TestingKnobs().EnqueueReplicaInterceptor; fn != nil {
3389+
fn(queueName, repl)
3390+
}
3391+
33883392
// Do not enqueue uninitialized replicas. The baseQueue ignores these during
33893393
// normal queue scheduling, but we error here to signal to the user that the
33903394
// operation was unsuccessful.
@@ -3428,8 +3432,14 @@ func (s *Store) Enqueue(
34283432
if async {
34293433
// NB: 1e6 is a placeholder for now. We want to use a high enough priority
34303434
// to ensure that these replicas are priority-ordered first.
3435+
//
3436+
// TODO(aayush): Once we address https://github.com/cockroachdb/cockroach/issues/79266,
3437+
// we can consider removing the `AddAsync` path here and just use the
3438+
// `MaybeAddAsync` path, which will allow us to stop specifiying the
3439+
// priority ad-hoc.
3440+
const asyncEnqueuePriority = 1e6
34313441
if skipShouldQueue {
3432-
queue.AddAsync(ctx, repl, 1e6 /* prio */)
3442+
queue.AddAsync(ctx, repl, asyncEnqueuePriority)
34333443
} else {
34343444
queue.MaybeAddAsync(ctx, repl, repl.Clock().NowAsClockTimestamp())
34353445
}

pkg/kv/kvserver/testing_knobs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,9 @@ type StoreTestingKnobs struct {
403403
// AfterSendSnapshotThrottle intercepts replicas after receiving a spot in the
404404
// send snapshot semaphore.
405405
AfterSendSnapshotThrottle func()
406+
407+
// EnqueueReplicaInterceptor intercepts calls to `store.Enqueue()`.
408+
EnqueueReplicaInterceptor func(queueName string, replica *Replica)
406409
}
407410

408411
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.

pkg/roachpb/metadata_replicas.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,17 @@ func (d ReplicaSet) ConfState() raftpb.ConfState {
393393
return cs
394394
}
395395

396+
// HasReplicaOnNode returns true iff the given nodeID is present in the
397+
// ReplicaSet.
398+
func (d ReplicaSet) HasReplicaOnNode(nodeID NodeID) bool {
399+
for _, rep := range d.wrapped {
400+
if rep.NodeID == nodeID {
401+
return true
402+
}
403+
}
404+
return false
405+
}
406+
396407
// CanMakeProgress reports whether the given descriptors can make progress at
397408
// the replication layer. This is more complicated than just counting the number
398409
// of replicas due to the existence of joint quorums.

pkg/server/admin_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2374,6 +2374,66 @@ func TestDecommissionSelf(t *testing.T) {
23742374
}
23752375
}
23762376

2377+
// TestDecommissionEnqueueReplicas tests that a decommissioning node's replicas
2378+
// are proactively enqueued into their replicateQueues by the other nodes in the
2379+
// system.
2380+
func TestDecommissionEnqueueReplicas(t *testing.T) {
2381+
defer leaktest.AfterTest(t)()
2382+
defer log.Scope(t).Close(t)
2383+
2384+
skip.UnderRace(t) // can't handle 7-node clusters
2385+
2386+
ctx := context.Background()
2387+
enqueuedRangeIDs := make(chan roachpb.RangeID)
2388+
tc := serverutils.StartNewTestCluster(t, 7, base.TestClusterArgs{
2389+
ReplicationMode: base.ReplicationManual,
2390+
ServerArgs: base.TestServerArgs{
2391+
Insecure: true, // allows admin client without setting up certs
2392+
Knobs: base.TestingKnobs{
2393+
Store: &kvserver.StoreTestingKnobs{
2394+
EnqueueReplicaInterceptor: func(
2395+
queueName string, repl *kvserver.Replica,
2396+
) {
2397+
require.Equal(t, queueName, "replicate")
2398+
enqueuedRangeIDs <- repl.RangeID
2399+
},
2400+
},
2401+
},
2402+
},
2403+
})
2404+
defer tc.Stopper().Stop(ctx)
2405+
2406+
decommissionAndCheck := func(decommissioningSrvIdx int) {
2407+
t.Logf("decommissioning n%d", tc.Target(decommissioningSrvIdx).NodeID)
2408+
// Add a scratch range's replica to a node we will decommission.
2409+
scratchKey := tc.ScratchRange(t)
2410+
decommissioningSrv := tc.Server(decommissioningSrvIdx)
2411+
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decommissioningSrvIdx))
2412+
2413+
conn, err := decommissioningSrv.RPCContext().GRPCDialNode(
2414+
decommissioningSrv.RPCAddr(), decommissioningSrv.NodeID(), rpc.DefaultClass,
2415+
).Connect(ctx)
2416+
require.NoError(t, err)
2417+
adminClient := serverpb.NewAdminClient(conn)
2418+
decomNodeIDs := []roachpb.NodeID{tc.Server(decommissioningSrvIdx).NodeID()}
2419+
_, err = adminClient.Decommission(
2420+
ctx,
2421+
&serverpb.DecommissionRequest{
2422+
NodeIDs: decomNodeIDs,
2423+
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING,
2424+
},
2425+
)
2426+
require.NoError(t, err)
2427+
2428+
// Ensure that the scratch range's replica was proactively enqueued.
2429+
require.Equal(t, <-enqueuedRangeIDs, tc.LookupRangeOrFatal(t, scratchKey).RangeID)
2430+
}
2431+
2432+
decommissionAndCheck(2 /* decommissioningSrvIdx */)
2433+
decommissionAndCheck(3 /* decommissioningSrvIdx */)
2434+
decommissionAndCheck(5 /* decommissioningSrvIdx */)
2435+
}
2436+
23772437
func TestAdminDecommissionedOperations(t *testing.T) {
23782438
defer leaktest.AfterTest(t)()
23792439
defer log.Scope(t).Close(t)

pkg/server/decommission.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,100 @@ package server
1313
import (
1414
"context"
1515
"sort"
16+
"time"
1617

1718
"github.com/cockroachdb/cockroach/pkg/keys"
1819
"github.com/cockroachdb/cockroach/pkg/kv"
20+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
1921
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
2022
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
2123
"github.com/cockroachdb/cockroach/pkg/roachpb"
2224
"github.com/cockroachdb/cockroach/pkg/sql"
2325
"github.com/cockroachdb/cockroach/pkg/util/log"
2426
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
27+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
2528
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2629
"github.com/cockroachdb/errors"
2730
"google.golang.org/grpc/codes"
2831
grpcstatus "google.golang.org/grpc/status"
2932
)
3033

34+
// decommissioningNodeMap tracks the set of nodes that we know are
35+
// decommissioning. This map is used to inform whether we need to proactively
36+
// enqueue some decommissioning node's ranges for rebalancing.
37+
type decommissioningNodeMap struct {
38+
syncutil.RWMutex
39+
nodes map[roachpb.NodeID]interface{}
40+
}
41+
42+
// makeOnNodeDecommissioningCallback returns a callback that enqueues the
43+
// decommissioning node's ranges into the `stores`' replicateQueues for
44+
// rebalancing.
45+
func (t *decommissioningNodeMap) makeOnNodeDecommissioningCallback(
46+
stores *kvserver.Stores,
47+
) liveness.OnNodeDecommissionCallback {
48+
return func(decommissioningNodeID roachpb.NodeID) {
49+
ctx := context.Background()
50+
t.Lock()
51+
defer t.Unlock()
52+
if _, ok := t.nodes[decommissioningNodeID]; ok {
53+
// We've already enqueued this node's replicas up for processing.
54+
// Nothing more to do.
55+
return
56+
}
57+
58+
logLimiter := log.Every(5 * time.Second) // avoid log spam
59+
if err := stores.VisitStores(func(store *kvserver.Store) error {
60+
// For each range that we have a lease for, check if it has a replica
61+
// on the decommissioning node. If so, proactively enqueue this replica
62+
// into our local replicateQueue.
63+
store.VisitReplicas(
64+
func(replica *kvserver.Replica) (wantMore bool) {
65+
shouldEnqueue := replica.Desc().Replicas().HasReplicaOnNode(decommissioningNodeID) &&
66+
// Only bother enqueuing if we own the lease for this replica.
67+
replica.OwnsValidLease(ctx, replica.Clock().NowAsClockTimestamp())
68+
if !shouldEnqueue {
69+
return true /* wantMore */
70+
}
71+
_, processErr, enqueueErr := store.Enqueue(
72+
// NB: We elide the shouldQueue check since we _know_ that the
73+
// range being enqueued has replicas on a decommissioning node.
74+
// Unfortunately, until
75+
// https://github.com/cockroachdb/cockroach/issues/79266 is fixed,
76+
// the shouldQueue() method can return false negatives (i.e. it
77+
// would return false when it really shouldn't).
78+
ctx, "replicate", replica, true /* skipShouldQueue */, true, /* async */
79+
)
80+
if processErr != nil && logLimiter.ShouldLog() {
81+
// NB: The only case where we would expect to see a processErr when
82+
// enqueuing a replica async is if it does not have the lease. We
83+
// are checking that above, but that check is inherently racy.
84+
log.Warningf(
85+
ctx, "unexpected processing error when enqueuing replica asynchronously: %v", processErr,
86+
)
87+
}
88+
if enqueueErr != nil && logLimiter.ShouldLog() {
89+
log.Warningf(ctx, "unable to enqueue replica: %s", enqueueErr)
90+
}
91+
return true /* wantMore */
92+
})
93+
return nil
94+
}); err != nil {
95+
// We're swallowing any errors above, so this shouldn't ever happen.
96+
log.Fatalf(
97+
ctx, "error while nudging replicas for decommissioning node n%d", decommissioningNodeID,
98+
)
99+
}
100+
}
101+
}
102+
103+
func (t *decommissioningNodeMap) onNodeDecommissioned(nodeID roachpb.NodeID) {
104+
t.Lock()
105+
defer t.Unlock()
106+
// NB: We may have already deleted this node, but that's ok.
107+
delete(t.nodes, nodeID)
108+
}
109+
31110
func getPingCheckDecommissionFn(
32111
engines Engines,
33112
) (*nodeTombstoneStorage, func(context.Context, roachpb.NodeID, codes.Code) error) {

pkg/server/server.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ import (
8686
"github.com/cockroachdb/cockroach/pkg/util/uuid"
8787
"github.com/cockroachdb/errors"
8888
"github.com/cockroachdb/redact"
89-
sentry "github.com/getsentry/sentry-go"
89+
"github.com/getsentry/sentry-go"
9090
"google.golang.org/grpc/codes"
9191
)
9292

@@ -398,6 +398,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
398398
return nil, err
399399
}
400400

401+
stores := kvserver.NewStores(cfg.AmbientCtx, clock)
402+
403+
decomNodeMap := &decommissioningNodeMap{
404+
nodes: make(map[roachpb.NodeID]interface{}),
405+
}
401406
nodeLiveness := liveness.NewNodeLiveness(liveness.NodeLivenessOptions{
402407
AmbientCtx: cfg.AmbientCtx,
403408
Clock: clock,
@@ -407,6 +412,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
407412
RenewalDuration: nlRenewal,
408413
Settings: st,
409414
HistogramWindowInterval: cfg.HistogramWindowInterval(),
415+
// When we learn that a node is decommissioning, we want to proactively
416+
// enqueue the ranges we have that also have a replica on the
417+
// decommissioning node.
418+
OnNodeDecommissioning: decomNodeMap.makeOnNodeDecommissioningCallback(stores),
410419
OnNodeDecommissioned: func(liveness livenesspb.Liveness) {
411420
if knobs, ok := cfg.TestingKnobs.Server.(*TestingKnobs); ok && knobs.OnDecommissionedCallback != nil {
412421
knobs.OnDecommissionedCallback(liveness)
@@ -416,6 +425,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
416425
); err != nil {
417426
log.Fatalf(ctx, "unable to add tombstone for n%d: %s", liveness.NodeID, err)
418427
}
428+
429+
decomNodeMap.onNodeDecommissioned(liveness.NodeID)
419430
},
420431
})
421432
registry.AddMetricStruct(nodeLiveness.Metrics())
@@ -440,7 +451,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
440451
)
441452

442453
ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer)
443-
stores := kvserver.NewStores(cfg.AmbientCtx, clock)
444454
ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */)
445455

446456
// The InternalExecutor will be further initialized later, as we create more
@@ -663,10 +673,19 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
663673
)
664674

665675
node := NewNode(
666-
storeCfg, recorder, registry, stopper,
667-
txnMetrics, stores, nil /* execCfg */, cfg.ClusterIDContainer,
668-
gcoords.Regular.GetWorkQueue(admission.KVWork), gcoords.Stores,
669-
tenantUsage, tenantSettingsWatcher, spanConfig.kvAccessor,
676+
storeCfg,
677+
recorder,
678+
registry,
679+
stopper,
680+
txnMetrics,
681+
stores,
682+
nil,
683+
cfg.ClusterIDContainer,
684+
gcoords.Regular.GetWorkQueue(admission.KVWork),
685+
gcoords.Stores,
686+
tenantUsage,
687+
tenantSettingsWatcher,
688+
spanConfig.kvAccessor,
670689
)
671690
roachpb.RegisterInternalServer(grpcServer.Server, node)
672691
kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer)

0 commit comments

Comments
 (0)