@@ -100,10 +100,10 @@ func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProc
100100// the operations's timeout.
101101const permittedRangeScanSlowdown = 10
102102
103- // a purgatoryError indicates a replica processing failure which indicates
104- // the replica can be placed into purgatory for faster retries when the
105- // failure condition changes .
106- type purgatoryError interface {
103+ // PurgatoryError indicates a replica processing failure which indicates the
104+ // replica can be placed into purgatory for faster retries than the replica
105+ // scanner's interval .
106+ type PurgatoryError interface {
107107 error
108108 PurgatoryErrorMarker () // dummy method for unique interface
109109}
@@ -270,6 +270,11 @@ type queueImpl interface {
270270 // purgatory due to failures. If purgatoryChan returns nil, failing
271271 // replicas are not sent to purgatory.
272272 purgatoryChan () <- chan time.Time
273+
274+ // updateChan returns a channel that is signalled whenever there is an update
275+ // to the cluster state that might impact the replicas in the queue's
276+ // purgatory.
277+ updateChan () <- chan time.Time
273278}
274279
275280// queueProcessTimeoutFunc controls the timeout for queue processing for a
@@ -380,7 +385,7 @@ type queueConfig struct {
380385//
381386// A queueImpl can opt into a purgatory by returning a non-nil channel from the
382387// `purgatoryChan` method. A replica is put into purgatory when the `process`
383- // method returns an error with a `purgatoryError ` as an entry somewhere in the
388+ // method returns an error with a `PurgatoryError ` as an entry somewhere in the
384389// `Cause` chain. A replica in purgatory is not processed again until the
385390// channel is signaled, at which point every replica in purgatory is immediately
386391// processed. This catchup is run without the `timer` rate limiting but shares
@@ -414,7 +419,7 @@ type baseQueue struct {
414419 syncutil.Mutex // Protects all variables in the mu struct
415420 replicas map [roachpb.RangeID ]* replicaItem // Map from RangeID to replicaItem
416421 priorityQ priorityQueue // The priority queue
417- purgatory map [roachpb.RangeID ]purgatoryError // Map of replicas to processing errors
422+ purgatory map [roachpb.RangeID ]PurgatoryError // Map of replicas to processing errors
418423 stopped bool
419424 // Some tests in this package disable queues.
420425 disabled bool
@@ -987,8 +992,9 @@ func isBenign(err error) bool {
987992 return errors .HasType (err , (* benignError )(nil ))
988993}
989994
990- func isPurgatoryError (err error ) (purgatoryError , bool ) {
991- var purgErr purgatoryError
995+ // IsPurgatoryError returns true iff the given error is a purgatory error.
996+ func IsPurgatoryError (err error ) (PurgatoryError , bool ) {
997+ var purgErr PurgatoryError
992998 return purgErr , errors .As (err , & purgErr )
993999}
9941000
@@ -1084,7 +1090,7 @@ func (bq *baseQueue) finishProcessingReplica(
10841090 // the failing replica to purgatory. Note that even if the item was
10851091 // scheduled to be requeued, we ignore this if we add the replica to
10861092 // purgatory.
1087- if purgErr , ok := isPurgatoryError (err ); ok {
1093+ if purgErr , ok := IsPurgatoryError (err ); ok {
10881094 bq .mu .Lock ()
10891095 bq .addToPurgatoryLocked (ctx , stopper , repl , purgErr )
10901096 bq .mu .Unlock ()
@@ -1106,7 +1112,7 @@ func (bq *baseQueue) finishProcessingReplica(
11061112// addToPurgatoryLocked adds the specified replica to the purgatory queue, which
11071113// holds replicas which have failed processing.
11081114func (bq * baseQueue ) addToPurgatoryLocked (
1109- ctx context.Context , stopper * stop.Stopper , repl replicaInQueue , purgErr purgatoryError ,
1115+ ctx context.Context , stopper * stop.Stopper , repl replicaInQueue , purgErr PurgatoryError ,
11101116) {
11111117 bq .mu .AssertHeld ()
11121118
@@ -1144,7 +1150,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
11441150 }
11451151
11461152 // Otherwise, create purgatory and start processing.
1147- bq .mu .purgatory = map [roachpb.RangeID ]purgatoryError {
1153+ bq .mu .purgatory = map [roachpb.RangeID ]PurgatoryError {
11481154 repl .GetRangeID (): purgErr ,
11491155 }
11501156
@@ -1153,51 +1159,14 @@ func (bq *baseQueue) addToPurgatoryLocked(
11531159 ticker := time .NewTicker (purgatoryReportInterval )
11541160 for {
11551161 select {
1162+ case <- bq .impl .updateChan ():
1163+ if bq .processReplicasInPurgatory (ctx , stopper ) {
1164+ return
1165+ }
11561166 case <- bq .impl .purgatoryChan ():
1157- func () {
1158- // Acquire from the process semaphore, release when done.
1159- bq .processSem <- struct {}{}
1160- defer func () { <- bq .processSem }()
1161-
1162- // Remove all items from purgatory into a copied slice.
1163- bq .mu .Lock ()
1164- ranges := make ([]* replicaItem , 0 , len (bq .mu .purgatory ))
1165- for rangeID := range bq .mu .purgatory {
1166- item := bq .mu .replicas [rangeID ]
1167- if item == nil {
1168- log .Fatalf (ctx , "r%d is in purgatory but not in replicas" , rangeID )
1169- }
1170- item .setProcessing ()
1171- ranges = append (ranges , item )
1172- bq .removeFromPurgatoryLocked (item )
1173- }
1174- bq .mu .Unlock ()
1175-
1176- for _ , item := range ranges {
1177- repl , err := bq .getReplica (item .rangeID )
1178- if err != nil || item .replicaID != repl .ReplicaID () {
1179- continue
1180- }
1181- annotatedCtx := repl .AnnotateCtx (ctx )
1182- if stopper .RunTask (
1183- annotatedCtx , bq .processOpName (), func (ctx context.Context ) {
1184- err := bq .processReplica (ctx , repl )
1185- bq .finishProcessingReplica (ctx , stopper , repl , err )
1186- }) != nil {
1187- return
1188- }
1189- }
1190- }()
1191-
1192- // Clean up purgatory, if empty.
1193- bq .mu .Lock ()
1194- if len (bq .mu .purgatory ) == 0 {
1195- log .Infof (ctx , "purgatory is now empty" )
1196- bq .mu .purgatory = nil
1197- bq .mu .Unlock ()
1167+ if bq .processReplicasInPurgatory (ctx , stopper ) {
11981168 return
11991169 }
1200- bq .mu .Unlock ()
12011170 case <- ticker .C :
12021171 // Report purgatory status.
12031172 bq .mu .Lock ()
@@ -1213,7 +1182,61 @@ func (bq *baseQueue) addToPurgatoryLocked(
12131182 return
12141183 }
12151184 }
1216- })
1185+ },
1186+ )
1187+ }
1188+
1189+ // processReplicasInPurgatory processes replicas currently in the queue's
1190+ // purgatory.
1191+ func (bq * baseQueue ) processReplicasInPurgatory (
1192+ ctx context.Context , stopper * stop.Stopper ,
1193+ ) (purgatoryCleared bool ) {
1194+ func () {
1195+ // Acquire from the process semaphore, release when done.
1196+ bq .processSem <- struct {}{}
1197+ defer func () { <- bq .processSem }()
1198+
1199+ // Remove all items from purgatory into a copied slice.
1200+ bq .mu .Lock ()
1201+ ranges := make ([]* replicaItem , 0 , len (bq .mu .purgatory ))
1202+ for rangeID := range bq .mu .purgatory {
1203+ item := bq .mu .replicas [rangeID ]
1204+ if item == nil {
1205+ log .Fatalf (ctx , "r%d is in purgatory but not in replicas" , rangeID )
1206+ }
1207+ item .setProcessing ()
1208+ ranges = append (ranges , item )
1209+ bq .removeFromPurgatoryLocked (item )
1210+ }
1211+ bq .mu .Unlock ()
1212+
1213+ for _ , item := range ranges {
1214+ repl , err := bq .getReplica (item .rangeID )
1215+ if err != nil || item .replicaID != repl .ReplicaID () {
1216+ continue
1217+ }
1218+ annotatedCtx := repl .AnnotateCtx (ctx )
1219+ if stopper .RunTask (
1220+ annotatedCtx , bq .processOpName (), func (ctx context.Context ) {
1221+ err := bq .processReplica (ctx , repl )
1222+ bq .finishProcessingReplica (ctx , stopper , repl , err )
1223+ },
1224+ ) != nil {
1225+ return
1226+ }
1227+ }
1228+ }()
1229+
1230+ // Clean up purgatory, if empty.
1231+ bq .mu .Lock ()
1232+ if len (bq .mu .purgatory ) == 0 {
1233+ log .Infof (ctx , "purgatory is now empty" )
1234+ bq .mu .purgatory = nil
1235+ bq .mu .Unlock ()
1236+ return true /* purgatoryCleared */
1237+ }
1238+ bq .mu .Unlock ()
1239+ return false /* purgatoryCleared */
12171240}
12181241
12191242// pop dequeues the highest priority replica, if any, in the queue. The
0 commit comments