Skip to content

Commit c848e3d

Browse files
committed
server: react to decommissioning nodes by proactively enqueuing their replicas
Note: This patch implements a subset of cockroachdb#80836 Previously, when a node was marked `DECOMMISSIONING`, other nodes in the system would learn about it via gossip but wouldn't do much in the way of reacting to it. They'd rely on their `replicaScanner` to gradually run into the decommissioning node's ranges and rely on their `replicateQueue` to then rebalance them. This meant that even when decommissioning a mostly empty node, our worst case lower bound for marking that node fully decommissioned was _one full scanner interval_ (which is 10 minutes by default). This patch improves this behavior by installing an idempotent callback that is invoked every time a node is detected to be `DECOMMISSIONING`. When it is run, the callback enqueues all the replicas on the local stores that are on ranges that also have replicas on the decommissioning node. Release note (performance improvement): Decommissioning should now be substantially faster, particularly for small to moderately loaded nodes.
1 parent bbfb210 commit c848e3d

File tree

7 files changed

+212
-14
lines changed

7 files changed

+212
-14
lines changed

pkg/kv/kvserver/liveness/liveness.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,12 @@ type NodeLiveness struct {
196196
// heartbeatPaused contains an atomically-swapped number representing a bool
197197
// (1 or 0). heartbeatToken is a channel containing a token which is taken
198198
// when heartbeating or when pausing the heartbeat. Used for testing.
199-
heartbeatPaused uint32
200-
heartbeatToken chan struct{}
201-
metrics Metrics
202-
onNodeDecommissioned func(livenesspb.Liveness) // noop if nil
203-
engineSyncs singleflight.Group
199+
heartbeatPaused uint32
200+
heartbeatToken chan struct{}
201+
metrics Metrics
202+
onNodeDecommissioned func(livenesspb.Liveness) // noop if nil
203+
onNodeDecommissioning OnNodeDecommissionCallback // noop if nil
204+
engineSyncs singleflight.Group
204205

205206
mu struct {
206207
syncutil.RWMutex
@@ -279,6 +280,9 @@ type NodeLivenessOptions struct {
279280
// idempotent as it may be invoked multiple times and defaults to a
280281
// noop.
281282
OnNodeDecommissioned func(livenesspb.Liveness)
283+
// OnNodeDecommissioning is invoked when a node is detected to be
284+
// decommissioning.
285+
OnNodeDecommissioning OnNodeDecommissionCallback
282286
}
283287

284288
// NewNodeLiveness returns a new instance of NodeLiveness configured
@@ -696,6 +700,10 @@ func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool {
696700
!liveness.Draining
697701
}
698702

703+
// OnNodeDecommissionCallback is a callback that is invoked when a node is
704+
// detected to be decommissioning.
705+
type OnNodeDecommissionCallback func(nodeID roachpb.NodeID)
706+
699707
// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`.
700708
type NodeLivenessStartOptions struct {
701709
Engines []storage.Engine
@@ -1397,6 +1405,10 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record)
13971405

13981406
var shouldReplace bool
13991407
nl.mu.Lock()
1408+
1409+
// NB: shouldReplace will always be true right after a node restarts since the
1410+
// `nodes` map will be empty. This means that the callbacks called below will
1411+
// always be invoked at least once after node restarts.
14001412
oldLivenessRec, ok := nl.getLivenessLocked(newLivenessRec.NodeID)
14011413
if !ok {
14021414
shouldReplace = true
@@ -1424,6 +1436,9 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record)
14241436
if newLivenessRec.Membership.Decommissioned() && nl.onNodeDecommissioned != nil {
14251437
nl.onNodeDecommissioned(newLivenessRec.Liveness)
14261438
}
1439+
if newLivenessRec.Membership.Decommissioning() && nl.onNodeDecommissioning != nil {
1440+
nl.onNodeDecommissioning(newLivenessRec.NodeID)
1441+
}
14271442
}
14281443

14291444
// shouldReplaceLiveness checks to see if the new liveness is in fact newer

pkg/kv/kvserver/store.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3408,6 +3408,10 @@ func (s *Store) Enqueue(
34083408
) (recording tracing.Recording, processError error, enqueueError error) {
34093409
ctx = repl.AnnotateCtx(ctx)
34103410

3411+
if fn := s.TestingKnobs().EnqueueReplicaInterceptor; fn != nil {
3412+
fn(queueName, repl)
3413+
}
3414+
34113415
// Do not enqueue uninitialized replicas. The baseQueue ignores these during
34123416
// normal queue scheduling, but we error here to signal to the user that the
34133417
// operation was unsuccessful.
@@ -3445,10 +3449,17 @@ func (s *Store) Enqueue(
34453449
}
34463450

34473451
if async {
3448-
// NB: 1e6 is a placeholder for now. We want to use a high enough priority
3449-
// to ensure that these replicas are priority-ordered first.
3452+
// NB: 1e5 is a placeholder for now. We want to use a high enough priority
3453+
// to ensure that these replicas are priority-ordered first (just below the
3454+
// replacement of dead replicas).
3455+
//
3456+
// TODO(aayush): Once we address
3457+
// https://github.com/cockroachdb/cockroach/issues/79266, we can consider
3458+
// removing the `AddAsync` path here and just use the `MaybeAddAsync` path,
3459+
// which will allow us to stop specifiying the priority ad-hoc.
3460+
const asyncEnqueuePriority = 1e5
34503461
if skipShouldQueue {
3451-
queue.AddAsync(ctx, repl, 1e6 /* prio */)
3462+
queue.AddAsync(ctx, repl, asyncEnqueuePriority)
34523463
} else {
34533464
queue.MaybeAddAsync(ctx, repl, repl.Clock().NowAsClockTimestamp())
34543465
}

pkg/kv/kvserver/testing_knobs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,9 @@ type StoreTestingKnobs struct {
406406
// AfterSendSnapshotThrottle intercepts replicas after receiving a spot in the
407407
// send snapshot semaphore.
408408
AfterSendSnapshotThrottle func()
409+
410+
// EnqueueReplicaInterceptor intercepts calls to `store.Enqueue()`.
411+
EnqueueReplicaInterceptor func(queueName string, replica *Replica)
409412
}
410413

411414
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.

pkg/roachpb/metadata_replicas.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,17 @@ func (d ReplicaSet) ConfState() raftpb.ConfState {
393393
return cs
394394
}
395395

396+
// HasReplicaOnNode returns true iff the given nodeID is present in the
397+
// ReplicaSet.
398+
func (d ReplicaSet) HasReplicaOnNode(nodeID NodeID) bool {
399+
for _, rep := range d.wrapped {
400+
if rep.NodeID == nodeID {
401+
return true
402+
}
403+
}
404+
return false
405+
}
406+
396407
// CanMakeProgress reports whether the given descriptors can make progress at
397408
// the replication layer. This is more complicated than just counting the number
398409
// of replicas due to the existence of joint quorums.

pkg/server/admin_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2391,6 +2391,66 @@ func TestDecommissionSelf(t *testing.T) {
23912391
}
23922392
}
23932393

2394+
// TestDecommissionEnqueueReplicas tests that a decommissioning node's replicas
2395+
// are proactively enqueued into their replicateQueues by the other nodes in the
2396+
// system.
2397+
func TestDecommissionEnqueueReplicas(t *testing.T) {
2398+
defer leaktest.AfterTest(t)()
2399+
defer log.Scope(t).Close(t)
2400+
2401+
skip.UnderRace(t) // can't handle 7-node clusters
2402+
2403+
ctx := context.Background()
2404+
enqueuedRangeIDs := make(chan roachpb.RangeID)
2405+
tc := serverutils.StartNewTestCluster(t, 7, base.TestClusterArgs{
2406+
ReplicationMode: base.ReplicationManual,
2407+
ServerArgs: base.TestServerArgs{
2408+
Insecure: true, // allows admin client without setting up certs
2409+
Knobs: base.TestingKnobs{
2410+
Store: &kvserver.StoreTestingKnobs{
2411+
EnqueueReplicaInterceptor: func(
2412+
queueName string, repl *kvserver.Replica,
2413+
) {
2414+
require.Equal(t, queueName, "replicate")
2415+
enqueuedRangeIDs <- repl.RangeID
2416+
},
2417+
},
2418+
},
2419+
},
2420+
})
2421+
defer tc.Stopper().Stop(ctx)
2422+
2423+
decommissionAndCheck := func(decommissioningSrvIdx int) {
2424+
t.Logf("decommissioning n%d", tc.Target(decommissioningSrvIdx).NodeID)
2425+
// Add a scratch range's replica to a node we will decommission.
2426+
scratchKey := tc.ScratchRange(t)
2427+
decommissioningSrv := tc.Server(decommissioningSrvIdx)
2428+
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decommissioningSrvIdx))
2429+
2430+
conn, err := decommissioningSrv.RPCContext().GRPCDialNode(
2431+
decommissioningSrv.RPCAddr(), decommissioningSrv.NodeID(), rpc.DefaultClass,
2432+
).Connect(ctx)
2433+
require.NoError(t, err)
2434+
adminClient := serverpb.NewAdminClient(conn)
2435+
decomNodeIDs := []roachpb.NodeID{tc.Server(decommissioningSrvIdx).NodeID()}
2436+
_, err = adminClient.Decommission(
2437+
ctx,
2438+
&serverpb.DecommissionRequest{
2439+
NodeIDs: decomNodeIDs,
2440+
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING,
2441+
},
2442+
)
2443+
require.NoError(t, err)
2444+
2445+
// Ensure that the scratch range's replica was proactively enqueued.
2446+
require.Equal(t, <-enqueuedRangeIDs, tc.LookupRangeOrFatal(t, scratchKey).RangeID)
2447+
}
2448+
2449+
decommissionAndCheck(2 /* decommissioningSrvIdx */)
2450+
decommissionAndCheck(3 /* decommissioningSrvIdx */)
2451+
decommissionAndCheck(5 /* decommissioningSrvIdx */)
2452+
}
2453+
23942454
func TestAdminDecommissionedOperations(t *testing.T) {
23952455
defer leaktest.AfterTest(t)()
23962456
defer log.Scope(t).Close(t)

pkg/server/decommission.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,100 @@ package server
1313
import (
1414
"context"
1515
"sort"
16+
"time"
1617

1718
"github.com/cockroachdb/cockroach/pkg/keys"
1819
"github.com/cockroachdb/cockroach/pkg/kv"
20+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
1921
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
2022
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
2123
"github.com/cockroachdb/cockroach/pkg/roachpb"
2224
"github.com/cockroachdb/cockroach/pkg/sql"
2325
"github.com/cockroachdb/cockroach/pkg/util/log"
2426
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
27+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
2528
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2629
"github.com/cockroachdb/errors"
2730
"google.golang.org/grpc/codes"
2831
grpcstatus "google.golang.org/grpc/status"
2932
)
3033

34+
// decommissioningNodeMap tracks the set of nodes that we know are
35+
// decommissioning. This map is used to inform whether we need to proactively
36+
// enqueue some decommissioning node's ranges for rebalancing.
37+
type decommissioningNodeMap struct {
38+
syncutil.RWMutex
39+
nodes map[roachpb.NodeID]interface{}
40+
}
41+
42+
// makeOnNodeDecommissioningCallback returns a callback that enqueues the
43+
// decommissioning node's ranges into the `stores`' replicateQueues for
44+
// rebalancing.
45+
func (t *decommissioningNodeMap) makeOnNodeDecommissioningCallback(
46+
stores *kvserver.Stores,
47+
) liveness.OnNodeDecommissionCallback {
48+
return func(decommissioningNodeID roachpb.NodeID) {
49+
ctx := context.Background()
50+
t.Lock()
51+
defer t.Unlock()
52+
if _, ok := t.nodes[decommissioningNodeID]; ok {
53+
// We've already enqueued this node's replicas up for processing.
54+
// Nothing more to do.
55+
return
56+
}
57+
58+
logLimiter := log.Every(5 * time.Second) // avoid log spam
59+
if err := stores.VisitStores(func(store *kvserver.Store) error {
60+
// For each range that we have a lease for, check if it has a replica
61+
// on the decommissioning node. If so, proactively enqueue this replica
62+
// into our local replicateQueue.
63+
store.VisitReplicas(
64+
func(replica *kvserver.Replica) (wantMore bool) {
65+
shouldEnqueue := replica.Desc().Replicas().HasReplicaOnNode(decommissioningNodeID) &&
66+
// Only bother enqueuing if we own the lease for this replica.
67+
replica.OwnsValidLease(ctx, replica.Clock().NowAsClockTimestamp())
68+
if !shouldEnqueue {
69+
return true /* wantMore */
70+
}
71+
_, processErr, enqueueErr := store.Enqueue(
72+
// NB: We elide the shouldQueue check since we _know_ that the
73+
// range being enqueued has replicas on a decommissioning node.
74+
// Unfortunately, until
75+
// https://github.com/cockroachdb/cockroach/issues/79266 is fixed,
76+
// the shouldQueue() method can return false negatives (i.e. it
77+
// would return false when it really shouldn't).
78+
ctx, "replicate", replica, true /* skipShouldQueue */, true, /* async */
79+
)
80+
if processErr != nil && logLimiter.ShouldLog() {
81+
// NB: The only case where we would expect to see a processErr when
82+
// enqueuing a replica async is if it does not have the lease. We
83+
// are checking that above, but that check is inherently racy.
84+
log.Warningf(
85+
ctx, "unexpected processing error when enqueuing replica asynchronously: %v", processErr,
86+
)
87+
}
88+
if enqueueErr != nil && logLimiter.ShouldLog() {
89+
log.Warningf(ctx, "unable to enqueue replica: %s", enqueueErr)
90+
}
91+
return true /* wantMore */
92+
})
93+
return nil
94+
}); err != nil {
95+
// We're swallowing any errors above, so this shouldn't ever happen.
96+
log.Fatalf(
97+
ctx, "error while nudging replicas for decommissioning node n%d", decommissioningNodeID,
98+
)
99+
}
100+
}
101+
}
102+
103+
func (t *decommissioningNodeMap) onNodeDecommissioned(nodeID roachpb.NodeID) {
104+
t.Lock()
105+
defer t.Unlock()
106+
// NB: We may have already deleted this node, but that's ok.
107+
delete(t.nodes, nodeID)
108+
}
109+
31110
func getPingCheckDecommissionFn(
32111
engines Engines,
33112
) (*nodeTombstoneStorage, func(context.Context, roachpb.NodeID, codes.Code) error) {

pkg/server/server.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ import (
8686
"github.com/cockroachdb/cockroach/pkg/util/uuid"
8787
"github.com/cockroachdb/errors"
8888
"github.com/cockroachdb/redact"
89-
sentry "github.com/getsentry/sentry-go"
89+
"github.com/getsentry/sentry-go"
9090
"google.golang.org/grpc/codes"
9191
)
9292

@@ -398,6 +398,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
398398
return nil, err
399399
}
400400

401+
stores := kvserver.NewStores(cfg.AmbientCtx, clock)
402+
403+
decomNodeMap := &decommissioningNodeMap{
404+
nodes: make(map[roachpb.NodeID]interface{}),
405+
}
401406
nodeLiveness := liveness.NewNodeLiveness(liveness.NodeLivenessOptions{
402407
AmbientCtx: cfg.AmbientCtx,
403408
Stopper: stopper,
@@ -408,6 +413,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
408413
RenewalDuration: nlRenewal,
409414
Settings: st,
410415
HistogramWindowInterval: cfg.HistogramWindowInterval(),
416+
// When we learn that a node is decommissioning, we want to proactively
417+
// enqueue the ranges we have that also have a replica on the
418+
// decommissioning node.
419+
OnNodeDecommissioning: decomNodeMap.makeOnNodeDecommissioningCallback(stores),
411420
OnNodeDecommissioned: func(liveness livenesspb.Liveness) {
412421
if knobs, ok := cfg.TestingKnobs.Server.(*TestingKnobs); ok && knobs.OnDecommissionedCallback != nil {
413422
knobs.OnDecommissionedCallback(liveness)
@@ -417,6 +426,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
417426
); err != nil {
418427
log.Fatalf(ctx, "unable to add tombstone for n%d: %s", liveness.NodeID, err)
419428
}
429+
430+
decomNodeMap.onNodeDecommissioned(liveness.NodeID)
420431
},
421432
})
422433
registry.AddMetricStruct(nodeLiveness.Metrics())
@@ -441,7 +452,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
441452
)
442453

443454
ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer)
444-
stores := kvserver.NewStores(cfg.AmbientCtx, clock)
445455
ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */)
446456

447457
// The InternalExecutor will be further initialized later, as we create more
@@ -665,10 +675,19 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
665675
)
666676

667677
node := NewNode(
668-
storeCfg, recorder, registry, stopper,
669-
txnMetrics, stores, nil /* execCfg */, cfg.ClusterIDContainer,
670-
gcoords.Regular.GetWorkQueue(admission.KVWork), gcoords.Stores,
671-
tenantUsage, tenantSettingsWatcher, spanConfig.kvAccessor,
678+
storeCfg,
679+
recorder,
680+
registry,
681+
stopper,
682+
txnMetrics,
683+
stores,
684+
nil,
685+
cfg.ClusterIDContainer,
686+
gcoords.Regular.GetWorkQueue(admission.KVWork),
687+
gcoords.Stores,
688+
tenantUsage,
689+
tenantSettingsWatcher,
690+
spanConfig.kvAccessor,
672691
)
673692
roachpb.RegisterInternalServer(grpcServer.Server, node)
674693
kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer)

0 commit comments

Comments
 (0)