diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 3c39db58180e..26ce665202f9 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -321,7 +321,7 @@ func (ae *allocatorError) Error() string { func (*allocatorError) purgatoryErrorMarker() {} -var _ purgatoryError = &allocatorError{} +var _ PurgatoryError = &allocatorError{} // allocatorRand pairs a rand.Rand with a mutex. // NOTE: Allocator is typically only accessed from a single thread (the diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 485aa828d406..411f0377d5a2 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -7012,7 +7012,7 @@ func TestAllocatorThrottled(t *testing.T) { // First test to make sure we would send the replica to purgatory. _, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) - if !errors.HasInterface(err, (*purgatoryError)(nil)) { + if !errors.HasInterface(err, (*PurgatoryError)(nil)) { t.Fatalf("expected a purgatory error, got: %+v", err) } @@ -7036,7 +7036,7 @@ func TestAllocatorThrottled(t *testing.T) { storeDetail.throttledUntil = timeutil.Now().Add(24 * time.Hour) a.storePool.detailsMu.Unlock() _, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) - if errors.HasInterface(err, (*purgatoryError)(nil)) { + if errors.HasInterface(err, (*PurgatoryError)(nil)) { t.Fatalf("expected a non purgatory error, got: %+v", err) } } diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 5d1a0120e6a0..771966df1a11 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -238,3 +238,7 @@ func (q *consistencyQueue) timer(duration time.Duration) time.Duration { func (*consistencyQueue) purgatoryChan() <-chan time.Time { return nil } + +func (*consistencyQueue) updateChan() <-chan time.Time { + return nil +} diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index bd1f8a48f1d6..522b20ea3af1 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -178,7 +178,7 @@ type rangeMergePurgatoryError struct{ error } func (rangeMergePurgatoryError) purgatoryErrorMarker() {} -var _ purgatoryError = rangeMergePurgatoryError{} +var _ PurgatoryError = rangeMergePurgatoryError{} func (mq *mergeQueue) requestRangeStats( ctx context.Context, key roachpb.Key, @@ -433,3 +433,7 @@ func (mq *mergeQueue) timer(time.Duration) time.Duration { func (mq *mergeQueue) purgatoryChan() <-chan time.Time { return mq.purgChan } + +func (mq *mergeQueue) updateChan() <-chan time.Time { + return nil +} diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index d218a1389a6a..f90a5ea588a1 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -644,3 +644,7 @@ func (*mvccGCQueue) timer(_ time.Duration) time.Duration { func (*mvccGCQueue) purgatoryChan() <-chan time.Time { return nil } + +func (*mvccGCQueue) updateChan() <-chan time.Time { + return nil +} diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index e9adac48e1b9..a74c6d7ee953 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -100,10 +100,10 @@ func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProc // the operations's timeout. const permittedRangeScanSlowdown = 10 -// a purgatoryError indicates a replica processing failure which indicates -// the replica can be placed into purgatory for faster retries when the -// failure condition changes. -type purgatoryError interface { +// PurgatoryError indicates a replica processing failure which indicates the +// replica can be placed into purgatory for faster retries than the replica +// scanner's interval. +type PurgatoryError interface { error purgatoryErrorMarker() // dummy method for unique interface } @@ -270,6 +270,11 @@ type queueImpl interface { // purgatory due to failures. If purgatoryChan returns nil, failing // replicas are not sent to purgatory. purgatoryChan() <-chan time.Time + + // updateChan returns a channel that is signalled whenever there is an update + // to the cluster state that might impact the replicas in the queue's + // purgatory. + updateChan() <-chan time.Time } // queueProcessTimeoutFunc controls the timeout for queue processing for a @@ -380,7 +385,7 @@ type queueConfig struct { // // A queueImpl can opt into a purgatory by returning a non-nil channel from the // `purgatoryChan` method. A replica is put into purgatory when the `process` -// method returns an error with a `purgatoryError` as an entry somewhere in the +// method returns an error with a `PurgatoryError` as an entry somewhere in the // `Cause` chain. A replica in purgatory is not processed again until the // channel is signaled, at which point every replica in purgatory is immediately // processed. This catchup is run without the `timer` rate limiting but shares @@ -414,7 +419,7 @@ type baseQueue struct { syncutil.Mutex // Protects all variables in the mu struct replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem priorityQ priorityQueue // The priority queue - purgatory map[roachpb.RangeID]purgatoryError // Map of replicas to processing errors + purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors stopped bool // Some tests in this package disable queues. disabled bool @@ -987,8 +992,9 @@ func isBenign(err error) bool { return errors.HasType(err, (*benignError)(nil)) } -func isPurgatoryError(err error) (purgatoryError, bool) { - var purgErr purgatoryError +// IsPurgatoryError returns true iff the given error is a purgatory error. +func IsPurgatoryError(err error) (PurgatoryError, bool) { + var purgErr PurgatoryError return purgErr, errors.As(err, &purgErr) } @@ -1084,7 +1090,7 @@ func (bq *baseQueue) finishProcessingReplica( // the failing replica to purgatory. Note that even if the item was // scheduled to be requeued, we ignore this if we add the replica to // purgatory. - if purgErr, ok := isPurgatoryError(err); ok { + if purgErr, ok := IsPurgatoryError(err); ok { bq.mu.Lock() bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr) bq.mu.Unlock() @@ -1106,7 +1112,7 @@ func (bq *baseQueue) finishProcessingReplica( // addToPurgatoryLocked adds the specified replica to the purgatory queue, which // holds replicas which have failed processing. func (bq *baseQueue) addToPurgatoryLocked( - ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr purgatoryError, + ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr PurgatoryError, ) { bq.mu.AssertHeld() @@ -1144,7 +1150,7 @@ func (bq *baseQueue) addToPurgatoryLocked( } // Otherwise, create purgatory and start processing. - bq.mu.purgatory = map[roachpb.RangeID]purgatoryError{ + bq.mu.purgatory = map[roachpb.RangeID]PurgatoryError{ repl.GetRangeID(): purgErr, } @@ -1153,51 +1159,14 @@ func (bq *baseQueue) addToPurgatoryLocked( ticker := time.NewTicker(purgatoryReportInterval) for { select { + case <-bq.impl.updateChan(): + if bq.processReplicasInPurgatory(ctx, stopper) { + return + } case <-bq.impl.purgatoryChan(): - func() { - // Acquire from the process semaphore, release when done. - bq.processSem <- struct{}{} - defer func() { <-bq.processSem }() - - // Remove all items from purgatory into a copied slice. - bq.mu.Lock() - ranges := make([]*replicaItem, 0, len(bq.mu.purgatory)) - for rangeID := range bq.mu.purgatory { - item := bq.mu.replicas[rangeID] - if item == nil { - log.Fatalf(ctx, "r%d is in purgatory but not in replicas", rangeID) - } - item.setProcessing() - ranges = append(ranges, item) - bq.removeFromPurgatoryLocked(item) - } - bq.mu.Unlock() - - for _, item := range ranges { - repl, err := bq.getReplica(item.rangeID) - if err != nil || item.replicaID != repl.ReplicaID() { - continue - } - annotatedCtx := repl.AnnotateCtx(ctx) - if stopper.RunTask( - annotatedCtx, bq.processOpName(), func(ctx context.Context) { - err := bq.processReplica(ctx, repl) - bq.finishProcessingReplica(ctx, stopper, repl, err) - }) != nil { - return - } - } - }() - - // Clean up purgatory, if empty. - bq.mu.Lock() - if len(bq.mu.purgatory) == 0 { - log.Infof(ctx, "purgatory is now empty") - bq.mu.purgatory = nil - bq.mu.Unlock() + if bq.processReplicasInPurgatory(ctx, stopper) { return } - bq.mu.Unlock() case <-ticker.C: // Report purgatory status. bq.mu.Lock() @@ -1213,7 +1182,61 @@ func (bq *baseQueue) addToPurgatoryLocked( return } } - }) + }, + ) +} + +// processReplicasInPurgatory processes replicas currently in the queue's +// purgatory. +func (bq *baseQueue) processReplicasInPurgatory( + ctx context.Context, stopper *stop.Stopper, +) (purgatoryCleared bool) { + func() { + // Acquire from the process semaphore, release when done. + bq.processSem <- struct{}{} + defer func() { <-bq.processSem }() + + // Remove all items from purgatory into a copied slice. + bq.mu.Lock() + ranges := make([]*replicaItem, 0, len(bq.mu.purgatory)) + for rangeID := range bq.mu.purgatory { + item := bq.mu.replicas[rangeID] + if item == nil { + log.Fatalf(ctx, "r%d is in purgatory but not in replicas", rangeID) + } + item.setProcessing() + ranges = append(ranges, item) + bq.removeFromPurgatoryLocked(item) + } + bq.mu.Unlock() + + for _, item := range ranges { + repl, err := bq.getReplica(item.rangeID) + if err != nil || item.replicaID != repl.ReplicaID() { + continue + } + annotatedCtx := repl.AnnotateCtx(ctx) + if stopper.RunTask( + annotatedCtx, bq.processOpName(), func(ctx context.Context) { + err := bq.processReplica(ctx, repl) + bq.finishProcessingReplica(ctx, stopper, repl, err) + }, + ) != nil { + return + } + } + }() + + // Clean up purgatory, if empty. + bq.mu.Lock() + if len(bq.mu.purgatory) == 0 { + log.Infof(ctx, "purgatory is now empty") + bq.mu.purgatory = nil + bq.mu.Unlock() + return true /* purgatoryCleared */ + } + bq.mu.Unlock() + return false /* purgatoryCleared */ } // pop dequeues the highest priority replica, if any, in the queue. The diff --git a/pkg/kv/kvserver/queue_concurrency_test.go b/pkg/kv/kvserver/queue_concurrency_test.go index 10b6a350f8a0..e39345f0f7a0 100644 --- a/pkg/kv/kvserver/queue_concurrency_test.go +++ b/pkg/kv/kvserver/queue_concurrency_test.go @@ -151,6 +151,10 @@ func (fakeQueueImpl) purgatoryChan() <-chan time.Time { return time.After(time.Nanosecond) } +func (fakeQueueImpl) updateChan() <-chan time.Time { + return nil +} + type fakeReplica struct { rangeID roachpb.RangeID replicaID roachpb.ReplicaID diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index c44bfa15bd4f..e273271774ff 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -84,6 +84,10 @@ func (tq *testQueueImpl) purgatoryChan() <-chan time.Time { return tq.pChan } +func (tq *testQueueImpl) updateChan() <-chan time.Time { + return nil +} + func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *baseQueue { if !cfg.acceptsUnsplitRanges { // Needed in order to pass the validation in newBaseQueue. diff --git a/pkg/kv/kvserver/raft_log_queue.go b/pkg/kv/kvserver/raft_log_queue.go index c76f9679e063..4937839acfdb 100644 --- a/pkg/kv/kvserver/raft_log_queue.go +++ b/pkg/kv/kvserver/raft_log_queue.go @@ -754,6 +754,10 @@ func (*raftLogQueue) purgatoryChan() <-chan time.Time { return nil } +func (*raftLogQueue) updateChan() <-chan time.Time { + return nil +} + func isLooselyCoupledRaftLogTruncationEnabled( ctx context.Context, settings *cluster.Settings, ) bool { diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index c23cbe742495..abfc62ad6c3f 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -175,3 +175,7 @@ func (*raftSnapshotQueue) timer(_ time.Duration) time.Duration { func (rq *raftSnapshotQueue) purgatoryChan() <-chan time.Time { return nil } + +func (rq *raftSnapshotQueue) updateChan() <-chan time.Time { + return nil +} diff --git a/pkg/kv/kvserver/replica_gc_queue.go b/pkg/kv/kvserver/replica_gc_queue.go index b64b6876f685..a62b538050f3 100644 --- a/pkg/kv/kvserver/replica_gc_queue.go +++ b/pkg/kv/kvserver/replica_gc_queue.go @@ -373,3 +373,7 @@ func (*replicaGCQueue) timer(_ time.Duration) time.Duration { func (*replicaGCQueue) purgatoryChan() <-chan time.Time { return nil } + +func (*replicaGCQueue) updateChan() <-chan time.Time { + return nil +} diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index ca4d745db0a1..bd696e217212 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -35,6 +35,12 @@ import ( ) const ( + // replicateQueuePurgatoryCheckInterval is the interval at which replicas in + // the replicate queue purgatory are re-attempted. Note that these replicas + // may be re-attempted more frequently by the replicateQueue in case there are + // gossip updates that might affect allocation decisions. + replicateQueuePurgatoryCheckInterval = 1 * time.Minute + // replicateQueueTimerDuration is the duration between replication of queued // replicas. replicateQueueTimerDuration = 0 // zero duration to process replication greedily @@ -339,18 +345,23 @@ func (metrics *ReplicateQueueMetrics) trackRebalanceReplicaCount(targetType targ // additional replica to their range. type replicateQueue struct { *baseQueue - metrics ReplicateQueueMetrics - allocator Allocator - updateChan chan time.Time + metrics ReplicateQueueMetrics + allocator Allocator + // purgCh is signalled every replicateQueuePurgatoryCheckInterval. + purgCh <-chan time.Time + // updateCh is signalled every time there is an update to the cluster's store + // descriptors. + updateCh chan time.Time lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines } // newReplicateQueue returns a new instance of replicateQueue. func newReplicateQueue(store *Store, allocator Allocator) *replicateQueue { rq := &replicateQueue{ - metrics: makeReplicateQueueMetrics(), - allocator: allocator, - updateChan: make(chan time.Time, 1), + metrics: makeReplicateQueueMetrics(), + allocator: allocator, + purgCh: time.NewTicker(replicateQueuePurgatoryCheckInterval).C, + updateCh: make(chan time.Time, 1), } store.metrics.registry.AddMetricStruct(&rq.metrics) rq.baseQueue = newBaseQueue( @@ -372,10 +383,9 @@ func newReplicateQueue(store *Store, allocator Allocator) *replicateQueue { purgatory: store.metrics.ReplicateQueuePurgatory, }, ) - updateFn := func() { select { - case rq.updateChan <- timeutil.Now(): + case rq.updateCh <- timeutil.Now(): default: } } @@ -517,6 +527,15 @@ func (rq *replicateQueue) process( return false, errors.Errorf("failed to replicate after %d retries", retryOpts.MaxRetries) } +// decommissionPurgatoryError wraps an error that occurs when attempting to +// rebalance a range that has a replica on a decommissioning node to indicate +// that the error should send the range to purgatory. +type decommissionPurgatoryError struct{ error } + +func (decommissionPurgatoryError) purgatoryErrorMarker() {} + +var _ PurgatoryError = decommissionPurgatoryError{} + func (rq *replicateQueue) processOneChange( ctx context.Context, repl *Replica, @@ -624,10 +643,14 @@ func (rq *replicateQueue) processOneChange( "decommissioning voter %v unexpectedly not found in %v", decommissioningVoterReplicas[0], voterReplicas) } - return rq.addOrReplaceVoters( + requeue, err := rq.addOrReplaceVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, decommissioning, dryRun) + if err != nil { + return requeue, decommissionPurgatoryError{err} + } + return requeue, nil case AllocatorReplaceDecommissioningNonVoter: - decommissioningNonVoterReplicas := rq.allocator.storePool.decommissioningReplicas(nonVoterReplicas) + decommissioningNonVoterReplicas := rq.store.cfg.StorePool.decommissioningReplicas(nonVoterReplicas) if len(decommissioningNonVoterReplicas) == 0 { return false, nil } @@ -637,8 +660,12 @@ func (rq *replicateQueue) processOneChange( "decommissioning non-voter %v unexpectedly not found in %v", decommissioningNonVoterReplicas[0], nonVoterReplicas) } - return rq.addOrReplaceNonVoters( + requeue, err := rq.addOrReplaceNonVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, decommissioning, dryRun) + if err != nil { + return requeue, decommissionPurgatoryError{err} + } + return requeue, nil // Remove decommissioning replicas. // @@ -646,9 +673,17 @@ func (rq *replicateQueue) processOneChange( // has decommissioning replicas; in the common case we'll hit // AllocatorReplaceDecommissioning{Non}Voter above. case AllocatorRemoveDecommissioningVoter: - return rq.removeDecommissioning(ctx, repl, voterTarget, dryRun) + requeue, err := rq.removeDecommissioning(ctx, repl, voterTarget, dryRun) + if err != nil { + return requeue, decommissionPurgatoryError{err} + } + return requeue, nil case AllocatorRemoveDecommissioningNonVoter: - return rq.removeDecommissioning(ctx, repl, nonVoterTarget, dryRun) + requeue, err := rq.removeDecommissioning(ctx, repl, nonVoterTarget, dryRun) + if err != nil { + return requeue, decommissionPurgatoryError{err} + } + return requeue, nil // Remove dead replicas. // @@ -792,7 +827,7 @@ func (rq *replicateQueue) addOrReplaceVoters( _, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters) if err != nil { // It does not seem possible to go to the next odd replica state. Note - // that AllocateVoter returns an allocatorError (a purgatoryError) + // that AllocateVoter returns an allocatorError (a PurgatoryError) // when purgatory is requested. return false, errors.Wrap(err, "avoid up-replicating to fragile quorum") } @@ -1627,9 +1662,13 @@ func (*replicateQueue) timer(_ time.Duration) time.Duration { return replicateQueueTimerDuration } -// purgatoryChan returns the replicate queue's store update channel. func (rq *replicateQueue) purgatoryChan() <-chan time.Time { - return rq.updateChan + return rq.purgCh +} + +// updateChan returns the replicate queue's store update channel. +func (rq *replicateQueue) updateChan() <-chan time.Time { + return rq.updateCh } // rangeRaftStatus pretty-prints the Raft progress (i.e. Raft log position) of diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 7b9f568e5120..8cb38a68d631 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -32,7 +32,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -587,6 +589,70 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { }) } +// TestReplicateQueueDecommissionPurgatoryError tests that failure to move a +// decommissioning replica puts it in the replicate queue purgatory. +func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // NB: This test injects a fake failure during replica rebalancing, and we use + // this `rejectSnapshots` variable as a flag to activate or deactivate that + // injected failure. + var rejectSnapshots int64 + ctx := context.Background() + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error { + if atomic.LoadInt64(&rejectSnapshots) == 1 { + return errors.Newf("boom") + } + return nil + }, + }}}, + }, + ) + defer tc.Stopper().Stop(ctx) + + // Add a replica to the second and third nodes, and then decommission the + // second node. Since there are only 4 nodes in the cluster, the + // decommissioning replica must be rebalanced to the fourth node. + const decomNodeIdx = 1 + const decomNodeID = 2 + scratchKey := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx)) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx+1)) + adminSrv := tc.Server(decomNodeIdx) + conn, err := adminSrv.RPCContext().GRPCDialNode( + adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx) + require.NoError(t, err) + adminClient := serverpb.NewAdminClient(conn) + _, err = adminClient.Decommission( + ctx, &serverpb.DecommissionRequest{ + NodeIDs: []roachpb.NodeID{decomNodeID}, + TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING, + }, + ) + require.NoError(t, err) + + // Activate the above testing knob to start rejecting future rebalances and + // then attempt to rebalance the decommissioning replica away. We expect a + // purgatory error to be returned here. + atomic.StoreInt64(&rejectSnapshots, 1) + store := tc.GetFirstStoreFromServer(t, 0) + repl, err := store.GetReplica(tc.LookupRangeOrFatal(t, scratchKey).RangeID) + require.NoError(t, err) + _, processErr, enqueueErr := tc.GetFirstStoreFromServer(t, 0).ManuallyEnqueue( + ctx, "replicate", repl, true, /* skipShouldQueue */ + ) + require.NoError(t, enqueueErr) + _, isPurgErr := kvserver.IsPurgatoryError(processErr) + if !isPurgErr { + t.Fatalf("expected to receive a purgatory error, got %v", processErr) + } +} + // getLeaseholderStore returns the leaseholder store for the given scratchRange. func getLeaseholderStore( tc *testcluster.TestCluster, scratchRange roachpb.RangeDescriptor, diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index a611a335ac73..493f332d1d1e 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -155,7 +155,7 @@ type unsplittableRangeError struct{} func (unsplittableRangeError) Error() string { return "could not find valid split key" } func (unsplittableRangeError) purgatoryErrorMarker() {} -var _ purgatoryError = unsplittableRangeError{} +var _ PurgatoryError = unsplittableRangeError{} // process synchronously invokes admin split for each proposed split key. func (sq *splitQueue) process( @@ -274,3 +274,7 @@ func (*splitQueue) timer(_ time.Duration) time.Duration { func (sq *splitQueue) purgatoryChan() <-chan time.Time { return sq.purgChan } + +func (sq *splitQueue) updateChan() <-chan time.Time { + return nil +} diff --git a/pkg/kv/kvserver/ts_maintenance_queue.go b/pkg/kv/kvserver/ts_maintenance_queue.go index f0476e1895f4..4f9e07b75e62 100644 --- a/pkg/kv/kvserver/ts_maintenance_queue.go +++ b/pkg/kv/kvserver/ts_maintenance_queue.go @@ -178,3 +178,7 @@ func (q *timeSeriesMaintenanceQueue) timer(duration time.Duration) time.Duration func (*timeSeriesMaintenanceQueue) purgatoryChan() <-chan time.Time { return nil } + +func (*timeSeriesMaintenanceQueue) updateChan() <-chan time.Time { + return nil +}