Skip to content

Commit c9cf068

Browse files
author
Andrew Baptist
committed
kvserver: consolidate queue checks
Previously the queue checks were not done consistently in all places. This commit adds the method `replicaCanBeProcessed` and uses it before all queueing and processing of replicas. Epic: none Release note: None
1 parent d26a180 commit c9cf068

File tree

6 files changed

+145
-117
lines changed

6 files changed

+145
-117
lines changed

pkg/kv/kvserver/queue.go

Lines changed: 135 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ type replicaInQueue interface {
253253
IsDestroyed() (DestroyReason, error)
254254
Desc() *roachpb.RangeDescriptor
255255
redirectOnOrAcquireLease(context.Context) (kvserverpb.LeaseStatus, *kvpb.Error)
256-
LeaseStatusAt(context.Context, hlc.ClockTimestamp) kvserverpb.LeaseStatus
256+
CurrentLeaseStatus(context.Context) kvserverpb.LeaseStatus
257257
}
258258

259259
type queueImpl interface {
@@ -322,7 +322,8 @@ type queueConfig struct {
322322
// This is to avoid giving the queue a replica that spans multiple config
323323
// zones (which might make the action of the queue ambiguous - e.g. we don't
324324
// want to try to replicate a range until we know which zone it is in and
325-
// therefore how many replicas are required).
325+
// therefore how many replicas are required). If needsSpanConfig is not set
326+
// then this setting is ignored.
326327
acceptsUnsplitRanges bool
327328
// processDestroyedReplicas controls whether or not we want to process
328329
// replicas that have been destroyed but not GCed.
@@ -645,19 +646,6 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
645646
fn(ctx, bq)
646647
}
647648

648-
// Load the system config if it's needed.
649-
var confReader spanconfig.StoreReader
650-
if bq.needsSpanConfigs {
651-
var err error
652-
confReader, err = bq.store.GetConfReader(ctx)
653-
if err != nil {
654-
if errors.Is(err, errSpanConfigsUnavailable) && log.V(1) {
655-
log.Warningf(ctx, "unable to retrieve span configs, skipping: %v", err)
656-
}
657-
return
658-
}
659-
}
660-
661649
bq.mu.Lock()
662650
stopped := bq.mu.stopped
663651
disabled := bq.mu.disabled
@@ -677,37 +665,12 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
677665
}
678666
}
679667

680-
if !repl.IsInitialized() {
668+
// Load the system config if it's needed.
669+
confReader, err := bq.replicaCanBeProcessed(ctx, repl, false /* acquireLeaseIfNeeded */)
670+
if err != nil {
681671
return
682672
}
683673

684-
if !bq.acceptsUnsplitRanges {
685-
// Queue does not accept unsplit ranges. Check to see if the range needs to
686-
// be split because of spanconfigs.
687-
needsSplit, err := confReader.NeedsSplit(ctx, repl.Desc().StartKey, repl.Desc().EndKey)
688-
if err != nil {
689-
log.Warningf(ctx, "unable to compute whether split is needed; not adding")
690-
return
691-
}
692-
if needsSplit {
693-
if log.V(1) {
694-
log.Infof(ctx, "split needed; not adding")
695-
}
696-
return
697-
}
698-
}
699-
700-
if bq.needsLease {
701-
// Check to see if either we own the lease or do not know who the lease
702-
// holder is.
703-
st := repl.LeaseStatusAt(ctx, now)
704-
if st.IsValid() && !st.OwnedBy(repl.StoreID()) {
705-
if log.V(1) {
706-
log.Infof(ctx, "needs lease; not adding: %v", st.Lease)
707-
}
708-
return
709-
}
710-
}
711674
// NB: in production code, this type assertion is always true. In tests,
712675
// it may not be and shouldQueue will be passed a nil realRepl. These tests
713676
// know what they're getting into so that's fine.
@@ -716,7 +679,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
716679
if !should {
717680
return
718681
}
719-
_, err := bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority)
682+
_, err = bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority)
720683
if !isExpectedQueueError(err) {
721684
log.Errorf(ctx, "unable to add: %+v", err)
722685
}
@@ -895,6 +858,14 @@ func (bq *baseQueue) processLoop(stopper *stop.Stopper) {
895858
repl, priority := bq.pop()
896859
if repl != nil {
897860
annotatedCtx := repl.AnnotateCtx(ctx)
861+
_, err := bq.replicaCanBeProcessed(annotatedCtx, repl, false /*acquireLeaseIfNeeded */)
862+
if err != nil {
863+
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
864+
log.Infof(ctx, "skipping since replica can't be processed %v", err)
865+
// Release semaphore if it can't be processed.
866+
<-bq.processSem
867+
continue
868+
}
898869
if stopper.RunAsyncTaskEx(annotatedCtx, stop.TaskOpts{
899870
TaskName: bq.processOpName() + " [outer]",
900871
},
@@ -960,80 +931,30 @@ func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duratio
960931
// ctx should already be annotated by both bq.AnnotateCtx() and
961932
// repl.AnnotateCtx().
962933
func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) error {
963-
// Load the system config if it's needed.
964-
var confReader spanconfig.StoreReader
965-
if bq.needsSpanConfigs {
966-
var err error
967-
confReader, err = bq.store.GetConfReader(ctx)
968-
if errors.Is(err, errSpanConfigsUnavailable) {
969-
if log.V(1) {
970-
log.Warningf(ctx, "unable to retrieve conf reader, skipping: %v", err)
971-
}
972-
return nil
973-
}
974-
if err != nil {
975-
return err
976-
}
977-
}
978934

979-
if !bq.acceptsUnsplitRanges {
980-
// Queue does not accept unsplit ranges. Check to see if the range needs to
981-
// be spilt because of a span config.
982-
needsSplit, err := confReader.NeedsSplit(ctx, repl.Desc().StartKey, repl.Desc().EndKey)
983-
if err != nil {
984-
log.Warningf(ctx, "unable to compute NeedsSplit, skipping: %v", err)
985-
return nil
986-
}
987-
if needsSplit {
988-
log.VEventf(ctx, 3, "split needed; skipping")
935+
ctx, span := tracing.EnsureChildSpan(ctx, bq.Tracer, bq.processOpName())
936+
defer span.Finish()
937+
938+
log.VEventf(ctx, 1, "processing replica")
939+
940+
// Load the system config if it's needed.
941+
conf, err := bq.replicaCanBeProcessed(ctx, repl, true /* acquireLeaseIfNeeded */)
942+
if err != nil {
943+
if errors.Is(err, errMarkNotAcquirableLease) {
989944
return nil
990945
}
946+
log.VErrEventf(ctx, 2, "replica can not be processed now: %s", err)
947+
return err
991948
}
992949

993-
ctx, span := tracing.EnsureChildSpan(ctx, bq.Tracer, bq.processOpName())
994-
defer span.Finish()
995950
return timeutil.RunWithTimeout(ctx, fmt.Sprintf("%s queue process replica %d", bq.name, repl.GetRangeID()),
996951
bq.processTimeoutFunc(bq.store.ClusterSettings(), repl), func(ctx context.Context) error {
997-
log.VEventf(ctx, 1, "processing replica")
998-
999-
if !repl.IsInitialized() {
1000-
// We checked this when adding the replica, but we need to check it again
1001-
// in case this is a different replica with the same range ID (see #14193).
1002-
// This is possible in the case where the replica was enqueued while not
1003-
// having a replica ID, perhaps due to a pre-emptive snapshot, and has
1004-
// since been removed and re-added at a different replica ID.
1005-
return errors.New("cannot process uninitialized replica")
1006-
}
1007-
1008-
if reason, err := repl.IsDestroyed(); err != nil {
1009-
if !bq.queueConfig.processDestroyedReplicas || reason == destroyReasonRemoved {
1010-
log.VEventf(ctx, 3, "replica destroyed (%s); skipping", err)
1011-
return nil
1012-
}
1013-
}
1014-
1015-
// If the queue requires a replica to have the range lease in
1016-
// order to be processed, check whether this replica has range lease
1017-
// and renew or acquire if necessary.
1018-
if bq.needsLease {
1019-
if _, pErr := repl.redirectOnOrAcquireLease(ctx); pErr != nil {
1020-
switch v := pErr.GetDetail().(type) {
1021-
case *kvpb.NotLeaseHolderError, *kvpb.RangeNotFoundError:
1022-
log.VEventf(ctx, 3, "%s; skipping", v)
1023-
return nil
1024-
default:
1025-
log.VErrEventf(ctx, 2, "could not obtain lease: %s", pErr)
1026-
return errors.Wrapf(pErr.GoError(), "%s: could not obtain lease", repl)
1027-
}
1028-
}
1029-
}
1030-
1031952
log.VEventf(ctx, 3, "processing...")
1032953
// NB: in production code, this type assertion is always true. In tests,
1033954
// it may not be and shouldQueue will be passed a nil realRepl. These tests
1034955
// know what they're getting into so that's fine.
1035956
realRepl, _ := repl.(*Replica)
1036-
processed, err := bq.impl.process(ctx, realRepl, confReader)
957+
processed, err := bq.impl.process(ctx, realRepl, conf)
1037958
if err != nil {
1038959
return err
1039960
}
@@ -1045,6 +966,100 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er
1045966
})
1046967
}
1047968

969+
// errMarkNotAcquirableLease Special case lease acquisition errors for cases
970+
// where the lease can't be acquired.
971+
var errMarkNotAcquirableLease = errors.New("lease can't be acquired")
972+
973+
// replicaCanBeProcessed validates that all the conditions for running this
974+
// queue are satisfied according to the queue configuration and the status of
975+
// the replica and its span config. This normalizes the logic for deciding
976+
// whether a queue can be processed. It returns an err if the replica can not be
977+
// processed right now. In some cases we want to attempt to acquire or renew a
978+
// lease if we don't currently have it and the queue requires a lease. This will
979+
// only return a nil SpanConfig if the queue does not require span configs.
980+
func (bq *baseQueue) replicaCanBeProcessed(
981+
ctx context.Context, repl replicaInQueue, acquireLeaseIfNeeded bool,
982+
) (spanconfig.StoreReader, error) {
983+
if !repl.IsInitialized() {
984+
// We checked this when adding the replica, but we need to check it again
985+
// in case this is a different replica with the same range ID (see #14193).
986+
// This is possible in the case where the replica was enqueued while not
987+
// having a replica ID, perhaps due to a pre-emptive snapshot, and has
988+
// since been removed and re-added at a different replica ID.
989+
return nil, errors.New("cannot process uninitialized replica")
990+
}
991+
992+
// The replica GC queue can process destroyed replicas if it is stuck in
993+
// destroyReasonMergePending for too long.
994+
if reason, err := repl.IsDestroyed(); err != nil {
995+
if !bq.queueConfig.processDestroyedReplicas || reason == destroyReasonRemoved {
996+
log.VEventf(ctx, 3, "replica destroyed (%s); skipping", err)
997+
return nil, errors.Wrap(err, "cannot process destroyed replica")
998+
}
999+
}
1000+
1001+
// The conf is only populated if the queue requires a span config. Otherwise
1002+
// nil is always returned.
1003+
var confReader spanconfig.StoreReader
1004+
if bq.needsSpanConfigs {
1005+
var err error
1006+
confReader, err = bq.store.GetConfReader(ctx)
1007+
if err != nil {
1008+
if log.V(1) || !errors.Is(err, errSpanConfigsUnavailable) {
1009+
log.Warningf(ctx, "unable to retrieve conf reader, skipping: %v", err)
1010+
}
1011+
return nil, err
1012+
}
1013+
1014+
if !bq.acceptsUnsplitRanges {
1015+
// Queue does not accept unsplit ranges. Check to see if the range needs to
1016+
// be spilt because of a span config.
1017+
needsSplit, err := confReader.NeedsSplit(ctx, repl.Desc().StartKey, repl.Desc().EndKey)
1018+
if err != nil {
1019+
log.Warningf(ctx, "unable to compute NeedsSplit, skipping: %v", err)
1020+
return nil, err
1021+
}
1022+
if needsSplit {
1023+
log.VEventf(ctx, 3, "split needed; skipping")
1024+
return nil, errors.New("split needed; skipping")
1025+
}
1026+
}
1027+
}
1028+
1029+
// If the queue requires a replica to have the range lease in
1030+
// order to be processed, check whether this replica has range lease
1031+
// and renew or acquire if necessary.
1032+
if bq.needsLease {
1033+
if acquireLeaseIfNeeded {
1034+
leaseStatus, pErr := repl.redirectOnOrAcquireLease(ctx)
1035+
if pErr != nil {
1036+
switch v := pErr.GetDetail().(type) {
1037+
case *kvpb.NotLeaseHolderError, *kvpb.RangeNotFoundError:
1038+
log.VEventf(ctx, 3, "%s; skipping", v)
1039+
return nil, errMarkNotAcquirableLease
1040+
}
1041+
log.VErrEventf(ctx, 2, "could not obtain lease: %s", pErr)
1042+
return nil, errors.Wrapf(pErr.GoError(), "%s: could not obtain lease", repl)
1043+
}
1044+
1045+
// TODO(baptist): Should this be added to replicaInQueue?
1046+
realRepl, _ := repl.(*Replica)
1047+
pErr = realRepl.maybeSwitchLeaseType(ctx, leaseStatus)
1048+
if pErr != nil {
1049+
return nil, pErr.GoError()
1050+
}
1051+
} else {
1052+
// Don't process if we don't own the lease.
1053+
st := repl.CurrentLeaseStatus(ctx)
1054+
if st.IsValid() && !st.OwnedBy(repl.StoreID()) {
1055+
log.VEventf(ctx, 1, "needs lease; not adding: %v", st.Lease)
1056+
return nil, errors.Newf("needs lease, not adding: %v", st.Lease)
1057+
}
1058+
}
1059+
}
1060+
return confReader, nil
1061+
}
1062+
10481063
// IsPurgatoryError returns true iff the given error is a purgatory error.
10491064
func IsPurgatoryError(err error) (PurgatoryError, bool) {
10501065
var purgErr PurgatoryError
@@ -1277,8 +1292,12 @@ func (bq *baseQueue) processReplicasInPurgatory(
12771292
annotatedCtx := repl.AnnotateCtx(ctx)
12781293
if stopper.RunTask(
12791294
annotatedCtx, bq.processOpName(), func(ctx context.Context) {
1280-
err := bq.processReplica(ctx, repl)
1281-
bq.finishProcessingReplica(ctx, stopper, repl, err)
1295+
if _, err := bq.replicaCanBeProcessed(ctx, repl, false); err != nil {
1296+
bq.finishProcessingReplica(ctx, stopper, repl, err)
1297+
} else {
1298+
err = bq.processReplica(ctx, repl)
1299+
bq.finishProcessingReplica(ctx, stopper, repl, err)
1300+
}
12821301
},
12831302
) != nil {
12841303
return
@@ -1386,18 +1405,22 @@ func (bq *baseQueue) removeFromReplicaSetLocked(rangeID roachpb.RangeID) {
13861405
// DrainQueue locks the queue and processes the remaining queued replicas. It
13871406
// processes the replicas in the order they're queued in, one at a time.
13881407
// Exposed for testing only.
1389-
func (bq *baseQueue) DrainQueue(stopper *stop.Stopper) {
1408+
func (bq *baseQueue) DrainQueue(ctx context.Context, stopper *stop.Stopper) {
13901409
// Lock processing while draining. This prevents the main process
13911410
// loop from racing with this method and ensures that any replicas
13921411
// queued up when this method was called will be processed by the
13931412
// time it returns.
13941413
defer bq.lockProcessing()()
13951414

1396-
ctx := bq.AnnotateCtx(context.Background())
1415+
ctx = bq.AnnotateCtx(ctx)
13971416
for repl, _ := bq.pop(); repl != nil; repl, _ = bq.pop() {
13981417
annotatedCtx := repl.AnnotateCtx(ctx)
1399-
err := bq.processReplica(annotatedCtx, repl)
1400-
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
1418+
if _, err := bq.replicaCanBeProcessed(annotatedCtx, repl, false); err != nil {
1419+
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
1420+
} else {
1421+
err = bq.processReplica(annotatedCtx, repl)
1422+
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
1423+
}
14011424
}
14021425
}
14031426

pkg/kv/kvserver/queue_concurrency_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,6 @@ func (fr *fakeReplica) redirectOnOrAcquireLease(
191191
// baseQueue only checks that the returned error is nil.
192192
return kvserverpb.LeaseStatus{}, nil
193193
}
194-
func (fr *fakeReplica) LeaseStatusAt(context.Context, hlc.ClockTimestamp) kvserverpb.LeaseStatus {
194+
func (fr *fakeReplica) CurrentLeaseStatus(context.Context) kvserverpb.LeaseStatus {
195195
return kvserverpb.LeaseStatus{}
196196
}

pkg/kv/kvserver/queue_helpers_testutil.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func forceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) error {
3939
return true
4040
})
4141

42-
q.DrainQueue(s.stopper)
42+
q.DrainQueue(ctx, s.stopper)
4343
return nil
4444
}
4545

pkg/kv/kvserver/queue_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,11 +1409,11 @@ func TestBaseQueueChangeReplicaID(t *testing.T) {
14091409
bq.mu.Unlock()
14101410
require.Equal(t, 0, testQueue.getProcessed())
14111411
bq.maybeAdd(ctx, r, tc.store.Clock().NowAsClockTimestamp())
1412-
bq.DrainQueue(tc.store.Stopper())
1412+
bq.DrainQueue(ctx, tc.store.Stopper())
14131413
require.Equal(t, 1, testQueue.getProcessed())
14141414
bq.maybeAdd(ctx, r, tc.store.Clock().NowAsClockTimestamp())
14151415
r.replicaID = 2
1416-
bq.DrainQueue(tc.store.Stopper())
1416+
bq.DrainQueue(ctx, tc.store.Stopper())
14171417
require.Equal(t, 1, testQueue.getProcessed())
14181418
require.Equal(t, 0, bq.Length())
14191419
require.Equal(t, 0, bq.PurgatoryLength())

pkg/kv/kvserver/replica_command.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3994,7 +3994,6 @@ func (r *Replica) adminScatter(
39943994
// Note that we disable lease transfers until the final step as transferring
39953995
// the lease prevents any further action on this node.
39963996
var allowLeaseTransfer bool
3997-
var err error
39983997
requeue := true
39993998
canTransferLease := func(ctx context.Context, repl plan.LeaseCheckReplica, conf *roachpb.SpanConfig) bool {
40003999
return allowLeaseTransfer
@@ -4007,6 +4006,11 @@ func (r *Replica) adminScatter(
40074006
allowLeaseTransfer = true
40084007
}
40094008
desc, conf := r.DescAndSpanConfig()
4009+
_, err := rq.replicaCanBeProcessed(ctx, r, false /* acquireLeaseIfNeeded */)
4010+
if err != nil {
4011+
// The replica can not be processed, so skip it.
4012+
break
4013+
}
40104014
requeue, err = rq.processOneChange(
40114015
ctx, r, desc, conf, canTransferLease, true /* scatter */, false, /* dryRun */
40124016
)

pkg/kv/kvserver/replicate_queue.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,7 @@ func (rq *replicateQueue) processOneChange(
939939
// preProcessCheck checks the lease and destroy status of the replica. This is
940940
// done to ensure that the replica has a valid lease, correct lease type and is
941941
// not destroyed.
942+
// TODO(baptist): Remove this check. It is redundant with Queue.replicaCanBeProcessed
942943
func (rq *replicateQueue) preProcessCheck(ctx context.Context, repl *Replica) error {
943944
// Check lease and destroy status here. The queue does this higher up already, but
944945
// adminScatter (and potential other future callers) also call this method and don't

0 commit comments

Comments
 (0)