Skip to content

Commit 6f5122b

Browse files
committed
kvserver: retry failures to rebalance decommissioning replicas
This commit makes it such that failures to rebalance replicas on decommissioning nodes no longer move the replica out of the replicateQueue as they previously used to. Instead, these failures now put these replicas into the replicateQueue's purgatory, which will retry these replicas every minute. All this is intended to improve the speed of decommissioning towards its tail end, since previously, failures to rebalance these replicas meant that they were only retried after about 10 minutes. Release note: None
1 parent 9502f33 commit 6f5122b

File tree

6 files changed

+112
-21
lines changed

6 files changed

+112
-21
lines changed

pkg/kv/kvserver/allocator_impl_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2727
"github.com/cockroachdb/cockroach/pkg/util/log"
2828
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
29-
"github.com/cockroachdb/errors"
3029
"go.etcd.io/etcd/raft/v3"
3130
"go.etcd.io/etcd/raft/v3/tracker"
3231
)
@@ -255,7 +254,7 @@ func TestAllocatorThrottled(t *testing.T) {
255254

256255
// First test to make sure we would send the replica to purgatory.
257256
_, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil)
258-
if !errors.HasInterface(err, (*purgatoryError)(nil)) {
257+
if _, ok := IsPurgatoryError(err); !ok {
259258
t.Fatalf("expected a purgatory error, got: %+v", err)
260259
}
261260

@@ -279,7 +278,7 @@ func TestAllocatorThrottled(t *testing.T) {
279278
storeDetail.ThrottledUntil = timeutil.Now().Add(24 * time.Hour)
280279
a.StorePool.DetailsMu.Unlock()
281280
_, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil)
282-
if errors.HasInterface(err, (*purgatoryError)(nil)) {
281+
if _, ok := IsPurgatoryError(err); ok {
283282
t.Fatalf("expected a non purgatory error, got: %+v", err)
284283
}
285284
}

pkg/kv/kvserver/merge_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ type rangeMergePurgatoryError struct{ error }
178178

179179
func (rangeMergePurgatoryError) PurgatoryErrorMarker() {}
180180

181-
var _ purgatoryError = rangeMergePurgatoryError{}
181+
var _ PurgatoryError = rangeMergePurgatoryError{}
182182

183183
func (mq *mergeQueue) requestRangeStats(
184184
ctx context.Context, key roachpb.Key,

pkg/kv/kvserver/queue.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,10 @@ func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProc
100100
// the operations's timeout.
101101
const permittedRangeScanSlowdown = 10
102102

103-
// a purgatoryError indicates a replica processing failure which indicates
104-
// the replica can be placed into purgatory for faster retries when the
105-
// failure condition changes.
106-
type purgatoryError interface {
103+
// PurgatoryError indicates a replica processing failure which indicates the
104+
// replica can be placed into purgatory for faster retries than the replica
105+
// scanner's interval.
106+
type PurgatoryError interface {
107107
error
108108
PurgatoryErrorMarker() // dummy method for unique interface
109109
}
@@ -385,7 +385,7 @@ type queueConfig struct {
385385
//
386386
// A queueImpl can opt into a purgatory by returning a non-nil channel from the
387387
// `purgatoryChan` method. A replica is put into purgatory when the `process`
388-
// method returns an error with a `purgatoryError` as an entry somewhere in the
388+
// method returns an error with a `PurgatoryError` as an entry somewhere in the
389389
// `Cause` chain. A replica in purgatory is not processed again until the
390390
// channel is signaled, at which point every replica in purgatory is immediately
391391
// processed. This catchup is run without the `timer` rate limiting but shares
@@ -419,7 +419,7 @@ type baseQueue struct {
419419
syncutil.Mutex // Protects all variables in the mu struct
420420
replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem
421421
priorityQ priorityQueue // The priority queue
422-
purgatory map[roachpb.RangeID]purgatoryError // Map of replicas to processing errors
422+
purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors
423423
stopped bool
424424
// Some tests in this package disable queues.
425425
disabled bool
@@ -992,8 +992,9 @@ func isBenign(err error) bool {
992992
return errors.HasType(err, (*benignError)(nil))
993993
}
994994

995-
func isPurgatoryError(err error) (purgatoryError, bool) {
996-
var purgErr purgatoryError
995+
// IsPurgatoryError returns true iff the given error is a purgatory error.
996+
func IsPurgatoryError(err error) (PurgatoryError, bool) {
997+
var purgErr PurgatoryError
997998
return purgErr, errors.As(err, &purgErr)
998999
}
9991000

@@ -1089,7 +1090,7 @@ func (bq *baseQueue) finishProcessingReplica(
10891090
// the failing replica to purgatory. Note that even if the item was
10901091
// scheduled to be requeued, we ignore this if we add the replica to
10911092
// purgatory.
1092-
if purgErr, ok := isPurgatoryError(err); ok {
1093+
if purgErr, ok := IsPurgatoryError(err); ok {
10931094
bq.mu.Lock()
10941095
bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr)
10951096
bq.mu.Unlock()
@@ -1111,7 +1112,7 @@ func (bq *baseQueue) finishProcessingReplica(
11111112
// addToPurgatoryLocked adds the specified replica to the purgatory queue, which
11121113
// holds replicas which have failed processing.
11131114
func (bq *baseQueue) addToPurgatoryLocked(
1114-
ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr purgatoryError,
1115+
ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr PurgatoryError,
11151116
) {
11161117
bq.mu.AssertHeld()
11171118

@@ -1149,7 +1150,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
11491150
}
11501151

11511152
// Otherwise, create purgatory and start processing.
1152-
bq.mu.purgatory = map[roachpb.RangeID]purgatoryError{
1153+
bq.mu.purgatory = map[roachpb.RangeID]PurgatoryError{
11531154
repl.GetRangeID(): purgErr,
11541155
}
11551156

pkg/kv/kvserver/replicate_queue.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,15 @@ func (rq *replicateQueue) process(
540540
return false, errors.Errorf("failed to replicate after %d retries", retryOpts.MaxRetries)
541541
}
542542

543+
// decommissionPurgatoryError wraps an error that occurs when attempting to
544+
// rebalance a range that has a replica on a decommissioning node to indicate
545+
// that the error should send the range to purgatory.
546+
type decommissionPurgatoryError struct{ error }
547+
548+
func (decommissionPurgatoryError) PurgatoryErrorMarker() {}
549+
550+
var _ PurgatoryError = decommissionPurgatoryError{}
551+
543552
func (rq *replicateQueue) processOneChange(
544553
ctx context.Context,
545554
repl *Replica,
@@ -645,8 +654,12 @@ func (rq *replicateQueue) processOneChange(
645654
"decommissioning voter %v unexpectedly not found in %v",
646655
decommissioningVoterReplicas[0], voterReplicas)
647656
}
648-
return rq.addOrReplaceVoters(
657+
requeue, err := rq.addOrReplaceVoters(
649658
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun)
659+
if err != nil {
660+
return requeue, decommissionPurgatoryError{err}
661+
}
662+
return requeue, nil
650663
case allocatorimpl.AllocatorReplaceDecommissioningNonVoter:
651664
decommissioningNonVoterReplicas := rq.store.cfg.StorePool.DecommissioningReplicas(nonVoterReplicas)
652665
if len(decommissioningNonVoterReplicas) == 0 {
@@ -658,18 +671,30 @@ func (rq *replicateQueue) processOneChange(
658671
"decommissioning non-voter %v unexpectedly not found in %v",
659672
decommissioningNonVoterReplicas[0], nonVoterReplicas)
660673
}
661-
return rq.addOrReplaceNonVoters(
674+
requeue, err := rq.addOrReplaceNonVoters(
662675
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun)
676+
if err != nil {
677+
return requeue, decommissionPurgatoryError{err}
678+
}
679+
return requeue, nil
663680

664681
// Remove decommissioning replicas.
665682
//
666683
// NB: these two paths will only be hit when the range is over-replicated and
667684
// has decommissioning replicas; in the common case we'll hit
668685
// AllocatorReplaceDecommissioning{Non}Voter above.
669686
case allocatorimpl.AllocatorRemoveDecommissioningVoter:
670-
return rq.removeDecommissioning(ctx, repl, allocatorimpl.VoterTarget, dryRun)
687+
requeue, err := rq.removeDecommissioning(ctx, repl, allocatorimpl.VoterTarget, dryRun)
688+
if err != nil {
689+
return requeue, decommissionPurgatoryError{err}
690+
}
691+
return requeue, nil
671692
case allocatorimpl.AllocatorRemoveDecommissioningNonVoter:
672-
return rq.removeDecommissioning(ctx, repl, allocatorimpl.NonVoterTarget, dryRun)
693+
requeue, err := rq.removeDecommissioning(ctx, repl, allocatorimpl.NonVoterTarget, dryRun)
694+
if err != nil {
695+
return requeue, decommissionPurgatoryError{err}
696+
}
697+
return requeue, nil
673698

674699
// Remove dead replicas.
675700
//
@@ -813,7 +838,7 @@ func (rq *replicateQueue) addOrReplaceVoters(
813838
_, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters)
814839
if err != nil {
815840
// It does not seem possible to go to the next odd replica state. Note
816-
// that AllocateVoter returns an allocatorError (a purgatoryError)
841+
// that AllocateVoter returns an allocatorError (a PurgatoryError)
817842
// when purgatory is requested.
818843
return false, errors.Wrap(err, "avoid up-replicating to fragile quorum")
819844
}

pkg/kv/kvserver/replicate_queue_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ import (
3232
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
3333
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
3434
"github.com/cockroachdb/cockroach/pkg/roachpb"
35+
"github.com/cockroachdb/cockroach/pkg/rpc"
3536
"github.com/cockroachdb/cockroach/pkg/server"
37+
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
3638
"github.com/cockroachdb/cockroach/pkg/spanconfig"
3739
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
3840
"github.com/cockroachdb/cockroach/pkg/testutils"
@@ -587,6 +589,70 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) {
587589
})
588590
}
589591

592+
// TestReplicateQueueDecommissionPurgatoryError tests that failure to move a
593+
// decommissioning replica puts it in the replicate queue purgatory.
594+
func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) {
595+
defer leaktest.AfterTest(t)()
596+
defer log.Scope(t).Close(t)
597+
598+
// NB: This test injects a fake failure during replica rebalancing, and we use
599+
// this `rejectSnapshots` variable as a flag to activate or deactivate that
600+
// injected failure.
601+
var rejectSnapshots int64
602+
ctx := context.Background()
603+
tc := testcluster.StartTestCluster(
604+
t, 4, base.TestClusterArgs{
605+
ReplicationMode: base.ReplicationManual,
606+
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
607+
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
608+
if atomic.LoadInt64(&rejectSnapshots) == 1 {
609+
return errors.Newf("boom")
610+
}
611+
return nil
612+
},
613+
}}},
614+
},
615+
)
616+
defer tc.Stopper().Stop(ctx)
617+
618+
// Add a replica to the second and third nodes, and then decommission the
619+
// second node. Since there are only 4 nodes in the cluster, the
620+
// decommissioning replica must be rebalanced to the fourth node.
621+
const decomNodeIdx = 1
622+
const decomNodeID = 2
623+
scratchKey := tc.ScratchRange(t)
624+
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx))
625+
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx+1))
626+
adminSrv := tc.Server(decomNodeIdx)
627+
conn, err := adminSrv.RPCContext().GRPCDialNode(
628+
adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx)
629+
require.NoError(t, err)
630+
adminClient := serverpb.NewAdminClient(conn)
631+
_, err = adminClient.Decommission(
632+
ctx, &serverpb.DecommissionRequest{
633+
NodeIDs: []roachpb.NodeID{decomNodeID},
634+
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING,
635+
},
636+
)
637+
require.NoError(t, err)
638+
639+
// Activate the above testing knob to start rejecting future rebalances and
640+
// then attempt to rebalance the decommissioning replica away. We expect a
641+
// purgatory error to be returned here.
642+
atomic.StoreInt64(&rejectSnapshots, 1)
643+
store := tc.GetFirstStoreFromServer(t, 0)
644+
repl, err := store.GetReplica(tc.LookupRangeOrFatal(t, scratchKey).RangeID)
645+
require.NoError(t, err)
646+
_, processErr, enqueueErr := tc.GetFirstStoreFromServer(t, 0).Enqueue(
647+
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
648+
)
649+
require.NoError(t, enqueueErr)
650+
_, isPurgErr := kvserver.IsPurgatoryError(processErr)
651+
if !isPurgErr {
652+
t.Fatalf("expected to receive a purgatory error, got %v", processErr)
653+
}
654+
}
655+
590656
// getLeaseholderStore returns the leaseholder store for the given scratchRange.
591657
func getLeaseholderStore(
592658
tc *testcluster.TestCluster, scratchRange roachpb.RangeDescriptor,

pkg/kv/kvserver/split_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ type unsplittableRangeError struct{}
155155
func (unsplittableRangeError) Error() string { return "could not find valid split key" }
156156
func (unsplittableRangeError) PurgatoryErrorMarker() {}
157157

158-
var _ purgatoryError = unsplittableRangeError{}
158+
var _ PurgatoryError = unsplittableRangeError{}
159159

160160
// process synchronously invokes admin split for each proposed split key.
161161
func (sq *splitQueue) process(

0 commit comments

Comments
 (0)