Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,9 @@ func (bq *baseQueue) processReplicasInPurgatory(
for _, item := range ranges {
repl, err := bq.getReplica(item.rangeID)
if err != nil || item.replicaID != repl.ReplicaID() {
bq.mu.Lock()
bq.removeFromReplicaSetLocked(item.rangeID)
bq.mu.Unlock()
continue
}
annotatedCtx := repl.AnnotateCtx(ctx)
Expand All @@ -1281,6 +1284,10 @@ func (bq *baseQueue) processReplicasInPurgatory(
bq.finishProcessingReplica(ctx, stopper, repl, err)
},
) != nil {
// NB: We do not need to worry about removing any unprocessed replicas
// from the replica set here, as RunTask will only return an error when
// the stopper is quiescing or stopping -- meaning the process is
// shutting down.
return
}
}
Expand Down
52 changes: 48 additions & 4 deletions pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,17 +904,28 @@ func TestBaseQueuePurgatory(t *testing.T) {
return nil
})

// Change the replicaID of the first the replica and destroy the second
// replica. These replicas should not be processed and should be removed from
// the replica set. The number of processed replicas will be 2 less.
const rmReplCount = 2
repls[0].replicaID = 2
if err := tc.store.RemoveReplica(ctx, repls[1], repls[1].Desc().NextReplicaID, RemoveOptions{
DestroyData: true,
}); err != nil {
t.Fatal(err)
}

// Remove error and reprocess.
testQueue.err = nil
testQueue.pChan <- timeutil.Now()

testutils.SucceedsSoon(t, func() error {
if pc := testQueue.getProcessed(); pc != replicaCount*3 {
return errors.Errorf("expected %d processed replicas; got %d", replicaCount*3, pc)
if pc := testQueue.getProcessed(); pc != replicaCount*3-rmReplCount {
return errors.Errorf("expected %d processed replicas; got %d", replicaCount*3-rmReplCount, pc)
}
// Check metrics.
if v := bq.successes.Count(); v != int64(replicaCount) {
return errors.Errorf("expected %d processed replicas; got %d", replicaCount, v)
if v := bq.successes.Count(); v != int64(replicaCount)-rmReplCount {
return errors.Errorf("expected %d processed replicas; got %d", replicaCount-rmReplCount, v)
}
if v := bq.failures.Count(); v != int64(replicaCount*2) {
return errors.Errorf("expected %d failed replicas; got %d", replicaCount*2, v)
Expand All @@ -925,6 +936,15 @@ func TestBaseQueuePurgatory(t *testing.T) {
if v := bq.purgatory.Value(); v != 0 {
return errors.Errorf("expected 0 purgatory replicas; got %d", v)
}
// Verify there are no replicas left in the replica set after finishing
// processing. This is within the retry loop as the above conditions can
// pass without considering the removed replicas.
bq.mu.Lock()
replicasCount := len(bq.mu.replicas)
bq.mu.Unlock()
if replicasCount != 0 {
return errors.Errorf("expected no replicas in the replica set: got %d", replicasCount)
}
return nil
})

Expand All @@ -936,6 +956,30 @@ func TestBaseQueuePurgatory(t *testing.T) {
if l := bq.Length(); l != 0 {
t.Errorf("expected empty priorityQ; got %d", l)
}

// Verify that the replica with a changed replicaID can be processed.
beforeProcessCount := testQueue.getProcessed()
beforeSuccessCount := bq.successes.Count()
beforeFailureCount := bq.failures.Count()
bq.maybeAdd(ctx, repls[0], hlc.ClockTimestamp{})
testutils.SucceedsSoon(t, func() error {
if pc := testQueue.getProcessed(); pc != beforeProcessCount+1 {
return errors.Errorf("expected %d processed replicas; got %d", beforeProcessCount+1, pc)
}
if v := bq.successes.Count(); v != beforeSuccessCount+1 {
return errors.Errorf("expected %d processed replicas; got %d", beforeSuccessCount+1, v)
}
if v := bq.failures.Count(); v != beforeFailureCount {
return errors.Errorf("expected %d failed replicas; got %d", beforeFailureCount, v)
}
if v := bq.pending.Value(); v != 0 {
return errors.Errorf("expected 0 pending replicas; got %d", v)
}
if v := bq.purgatory.Value(); v != 0 {
return errors.Errorf("expected 0 purgatory replicas; got %d", v)
}
return nil
})
}

type processTimeoutQueueImpl struct {
Expand Down