Skip to content

Commit f71ed12

Browse files
craig[bot]kvoli
andcommitted
Merge #114365
114365: kvserver: remove changed replicas in purgatory from replica set r=nvanbenschoten a=kvoli It was possible for a replica to be stuck processing in a queue's replica set. This could occur when a replica had recently been removed from purgatory for processing but was destroyed, or replica ID changed before being processed. When this occurred, the replica could never be processed by the queue again, potentially leading to decommission stalls, constraint violations or under(over)replication. Remove the replica from the queue set upon encountering a replica which was destroyed, or replica ID changed when processing purgatory. This prevents the replica from becoming stuck in a processing state in the queue set. Fixes: #112761 Fixes: #110761 Release note (bug fix): The store queues will no longer leave purgatory replicas which have changed replica IDs, or have been destroyed stuck unable to process via the respective queue again if re-added. Co-authored-by: Austen McClernon <[email protected]>
2 parents cda25d0 + a24ba7f commit f71ed12

File tree

2 files changed

+55
-4
lines changed

2 files changed

+55
-4
lines changed

pkg/kv/kvserver/queue.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,6 +1272,9 @@ func (bq *baseQueue) processReplicasInPurgatory(
12721272
for _, item := range ranges {
12731273
repl, err := bq.getReplica(item.rangeID)
12741274
if err != nil || item.replicaID != repl.ReplicaID() {
1275+
bq.mu.Lock()
1276+
bq.removeFromReplicaSetLocked(item.rangeID)
1277+
bq.mu.Unlock()
12751278
continue
12761279
}
12771280
annotatedCtx := repl.AnnotateCtx(ctx)
@@ -1281,6 +1284,10 @@ func (bq *baseQueue) processReplicasInPurgatory(
12811284
bq.finishProcessingReplica(ctx, stopper, repl, err)
12821285
},
12831286
) != nil {
1287+
// NB: We do not need to worry about removing any unprocessed replicas
1288+
// from the replica set here, as RunTask will only return an error when
1289+
// the stopper is quiescing or stopping -- meaning the process is
1290+
// shutting down.
12841291
return
12851292
}
12861293
}

pkg/kv/kvserver/queue_test.go

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -904,17 +904,28 @@ func TestBaseQueuePurgatory(t *testing.T) {
904904
return nil
905905
})
906906

907+
// Change the replicaID of the first the replica and destroy the second
908+
// replica. These replicas should not be processed and should be removed from
909+
// the replica set. The number of processed replicas will be 2 less.
910+
const rmReplCount = 2
911+
repls[0].replicaID = 2
912+
if err := tc.store.RemoveReplica(ctx, repls[1], repls[1].Desc().NextReplicaID, RemoveOptions{
913+
DestroyData: true,
914+
}); err != nil {
915+
t.Fatal(err)
916+
}
917+
907918
// Remove error and reprocess.
908919
testQueue.err = nil
909920
testQueue.pChan <- timeutil.Now()
910921

911922
testutils.SucceedsSoon(t, func() error {
912-
if pc := testQueue.getProcessed(); pc != replicaCount*3 {
913-
return errors.Errorf("expected %d processed replicas; got %d", replicaCount*3, pc)
923+
if pc := testQueue.getProcessed(); pc != replicaCount*3-rmReplCount {
924+
return errors.Errorf("expected %d processed replicas; got %d", replicaCount*3-rmReplCount, pc)
914925
}
915926
// Check metrics.
916-
if v := bq.successes.Count(); v != int64(replicaCount) {
917-
return errors.Errorf("expected %d processed replicas; got %d", replicaCount, v)
927+
if v := bq.successes.Count(); v != int64(replicaCount)-rmReplCount {
928+
return errors.Errorf("expected %d processed replicas; got %d", replicaCount-rmReplCount, v)
918929
}
919930
if v := bq.failures.Count(); v != int64(replicaCount*2) {
920931
return errors.Errorf("expected %d failed replicas; got %d", replicaCount*2, v)
@@ -925,6 +936,15 @@ func TestBaseQueuePurgatory(t *testing.T) {
925936
if v := bq.purgatory.Value(); v != 0 {
926937
return errors.Errorf("expected 0 purgatory replicas; got %d", v)
927938
}
939+
// Verify there are no replicas left in the replica set after finishing
940+
// processing. This is within the retry loop as the above conditions can
941+
// pass without considering the removed replicas.
942+
bq.mu.Lock()
943+
replicasCount := len(bq.mu.replicas)
944+
bq.mu.Unlock()
945+
if replicasCount != 0 {
946+
return errors.Errorf("expected no replicas in the replica set: got %d", replicasCount)
947+
}
928948
return nil
929949
})
930950

@@ -936,6 +956,30 @@ func TestBaseQueuePurgatory(t *testing.T) {
936956
if l := bq.Length(); l != 0 {
937957
t.Errorf("expected empty priorityQ; got %d", l)
938958
}
959+
960+
// Verify that the replica with a changed replicaID can be processed.
961+
beforeProcessCount := testQueue.getProcessed()
962+
beforeSuccessCount := bq.successes.Count()
963+
beforeFailureCount := bq.failures.Count()
964+
bq.maybeAdd(ctx, repls[0], hlc.ClockTimestamp{})
965+
testutils.SucceedsSoon(t, func() error {
966+
if pc := testQueue.getProcessed(); pc != beforeProcessCount+1 {
967+
return errors.Errorf("expected %d processed replicas; got %d", beforeProcessCount+1, pc)
968+
}
969+
if v := bq.successes.Count(); v != beforeSuccessCount+1 {
970+
return errors.Errorf("expected %d processed replicas; got %d", beforeSuccessCount+1, v)
971+
}
972+
if v := bq.failures.Count(); v != beforeFailureCount {
973+
return errors.Errorf("expected %d failed replicas; got %d", beforeFailureCount, v)
974+
}
975+
if v := bq.pending.Value(); v != 0 {
976+
return errors.Errorf("expected 0 pending replicas; got %d", v)
977+
}
978+
if v := bq.purgatory.Value(); v != 0 {
979+
return errors.Errorf("expected 0 purgatory replicas; got %d", v)
980+
}
981+
return nil
982+
})
939983
}
940984

941985
type processTimeoutQueueImpl struct {

0 commit comments

Comments
 (0)