Skip to content

Commit ca59db4

Browse files
committed
kvserver: add an async parameter to Store.ManuallyEnqueue()
Release note: None
1 parent 1ee37de commit ca59db4

File tree

15 files changed

+109
-48
lines changed

15 files changed

+109
-48
lines changed

pkg/ccl/backupccl/utils_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
"github.com/cockroachdb/cockroach/pkg/keys"
3232
"github.com/cockroachdb/cockroach/pkg/kv"
3333
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
34-
roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
34+
"github.com/cockroachdb/cockroach/pkg/roachpb"
3535
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
3636
"github.com/cockroachdb/cockroach/pkg/testutils"
3737
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
@@ -565,7 +565,7 @@ WHERE start_pretty LIKE '%s' ORDER BY start_key ASC`, startPretty)).Scan(&startK
565565
lhServer := tc.Server(int(l.Replica.NodeID) - 1)
566566
s, repl := getFirstStoreReplica(t, lhServer, startKey)
567567
testutils.SucceedsSoon(t, func() error {
568-
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue)
568+
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, skipShouldQueue, false /* async */)
569569
require.NoError(t, err)
570570
return checkGCTrace(trace.String())
571571
})

pkg/ccl/multiregionccl/datadriven_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,8 +316,9 @@ SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.5s'
316316
return errors.New(`could not find replica`)
317317
}
318318
for _, queueName := range []string{"split", "replicate", "raftsnapshot"} {
319-
_, processErr, err := store.ManuallyEnqueue(ctx, queueName, repl,
320-
true /* skipShouldQueue */)
319+
_, processErr, err := store.Enqueue(
320+
ctx, queueName, repl, true /* skipShouldQueue */, false, /* async */
321+
)
321322
if processErr != nil {
322323
return processErr
323324
}

pkg/kv/kvserver/client_lease_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
708708
return nil
709709
})
710710
_, _, enqueueError := tc.GetFirstStoreFromServer(t, 0).
711-
ManuallyEnqueue(ctx, "replicate", repl, true)
711+
Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */)
712712

713713
require.NoError(t, enqueueError)
714714

@@ -907,7 +907,9 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {
907907
repl := tc.GetFirstStoreFromServer(t, i).LookupReplica(roachpb.RKey(key))
908908
require.NotNil(t, repl)
909909
// We don't know who the leaseholder might be, so ignore errors.
910-
_, _, _ = tc.GetFirstStoreFromServer(t, i).ManuallyEnqueue(ctx, "replicate", repl, true)
910+
_, _, _ = tc.GetFirstStoreFromServer(t, i).Enqueue(
911+
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
912+
)
911913
}
912914
}
913915

pkg/kv/kvserver/client_migration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func TestMigrateWithInflightSnapshot(t *testing.T) {
180180
repl, err := store.GetReplica(desc.RangeID)
181181
require.NoError(t, err)
182182
testutils.SucceedsSoon(t, func() error {
183-
trace, processErr, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
183+
trace, processErr, err := store.Enqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false /* async */)
184184
if err != nil {
185185
return err
186186
}

pkg/kv/kvserver/client_protectedts_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestProtectedTimestamps(t *testing.T) {
156156
testutils.SucceedsSoon(t, func() error {
157157
upsertUntilBackpressure()
158158
s, repl := getStoreAndReplica()
159-
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
159+
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
160160
require.NoError(t, err)
161161
if !processedRegexp.MatchString(trace.String()) {
162162
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
@@ -200,13 +200,13 @@ func TestProtectedTimestamps(t *testing.T) {
200200
s, repl := getStoreAndReplica()
201201
// The protectedts record will prevent us from aging the MVCC garbage bytes
202202
// past the oldest record so shouldQueue should be false. Verify that.
203-
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */)
203+
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
204204
require.NoError(t, err)
205205
require.Regexp(t, "(?s)shouldQueue=false", trace.String())
206206

207207
// If we skipShouldQueue then gc will run but it should only run up to the
208208
// timestamp of our record at the latest.
209-
trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */)
209+
trace, _, err = s.Enqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */, false /* async */)
210210
require.NoError(t, err)
211211
require.Regexp(t, "(?s)done with GC evaluation for 0 keys", trace.String())
212212
thresh := thresholdFromTrace(trace)
@@ -258,7 +258,7 @@ func TestProtectedTimestamps(t *testing.T) {
258258
// happens up to the protected timestamp of the new record.
259259
require.NoError(t, ptsWithDB.Release(ctx, nil, ptsRec.ID.GetUUID()))
260260
testutils.SucceedsSoon(t, func() error {
261-
trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
261+
trace, _, err = s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
262262
require.NoError(t, err)
263263
if !processedRegexp.MatchString(trace.String()) {
264264
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)

pkg/kv/kvserver/replica_learner_test.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -555,8 +555,12 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
555555
// Manually enqueue the leaseholder replica into its store's raft snapshot
556556
// queue. We expect it to pick up on the fact that the non-voter on its range
557557
// needs a snapshot.
558-
recording, pErr, err := leaseholderStore.ManuallyEnqueue(
559-
ctx, "raftsnapshot", leaseholderRepl, false, /* skipShouldQueue */
558+
recording, pErr, err := leaseholderStore.Enqueue(
559+
ctx,
560+
"raftsnapshot",
561+
leaseholderRepl,
562+
false, /* skipShouldQueue */
563+
false, /* async */
560564
)
561565
if pErr != nil {
562566
return pErr
@@ -751,7 +755,9 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
751755
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
752756
{
753757
require.Equal(t, int64(0), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
754-
_, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
758+
_, processErr, err := store.Enqueue(
759+
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
760+
)
755761
require.NoError(t, err)
756762
require.NoError(t, processErr)
757763
require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
@@ -769,7 +775,9 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
769775
ltk.withStopAfterJointConfig(func() {
770776
desc := tc.RemoveVotersOrFatal(t, scratchStartKey, tc.Target(2))
771777
require.True(t, desc.Replicas().InAtomicReplicationChange(), desc)
772-
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
778+
trace, processErr, err := store.Enqueue(
779+
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
780+
)
773781
require.NoError(t, err)
774782
require.NoError(t, processErr)
775783
formattedTrace := trace.String()
@@ -808,7 +816,9 @@ func TestReplicaGCQueueSeesLearnerOrJointConfig(t *testing.T) {
808816
// Run the replicaGC queue.
809817
checkNoGC := func() roachpb.RangeDescriptor {
810818
store, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey)
811-
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicaGC", repl, true /* skipShouldQueue */)
819+
trace, processErr, err := store.Enqueue(
820+
ctx, "replicaGC", repl, true /* skipShouldQueue */, false, /* async */
821+
)
812822
require.NoError(t, err)
813823
require.NoError(t, processErr)
814824
const msg = `not gc'able, replica is still in range descriptor: (n2,s2):`
@@ -868,7 +878,9 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) {
868878
// raft to figure out that the replica needs a snapshot.
869879
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
870880
testutils.SucceedsSoon(t, func() error {
871-
trace, processErr, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
881+
trace, processErr, err := store.Enqueue(
882+
ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false, /* async */
883+
)
872884
if err != nil {
873885
return err
874886
}
@@ -1004,7 +1016,9 @@ func TestLearnerReplicateQueueRace(t *testing.T) {
10041016
queue1ErrCh := make(chan error, 1)
10051017
go func() {
10061018
queue1ErrCh <- func() error {
1007-
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
1019+
trace, processErr, err := store.Enqueue(
1020+
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
1021+
)
10081022
if err != nil {
10091023
return err
10101024
}
@@ -1484,7 +1498,9 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
14841498
// ensure that the merge correctly notices that there is a snapshot in
14851499
// flight and ignores the range.
14861500
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchKey)
1487-
_, processErr, enqueueErr := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
1501+
_, processErr, enqueueErr := store.Enqueue(
1502+
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
1503+
)
14881504
require.NoError(t, enqueueErr)
14891505
require.True(t, kvserver.IsReplicationChangeInProgressError(processErr))
14901506
return nil
@@ -1529,7 +1545,9 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
15291545
})
15301546

15311547
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
1532-
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
1548+
trace, processErr, err := store.Enqueue(
1549+
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
1550+
)
15331551
require.NoError(t, err)
15341552
require.NoError(t, processErr)
15351553
formattedTrace := trace.String()
@@ -1564,7 +1582,9 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
15641582
checkTransitioningOut := func() {
15651583
t.Helper()
15661584
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
1567-
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
1585+
trace, processErr, err := store.Enqueue(
1586+
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
1587+
)
15681588
require.NoError(t, err)
15691589
require.NoError(t, processErr)
15701590
formattedTrace := trace.String()

pkg/kv/kvserver/replicate_queue_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,11 +1204,8 @@ func TestReplicateQueueShouldQueueNonVoter(t *testing.T) {
12041204
// because we know that it is the leaseholder (since it is the only voting
12051205
// replica).
12061206
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
1207-
recording, processErr, err := store.ManuallyEnqueue(
1208-
ctx,
1209-
"replicate",
1210-
repl,
1211-
false, /* skipShouldQueue */
1207+
recording, processErr, err := store.Enqueue(
1208+
ctx, "replicate", repl, false /* skipShouldQueue */, false, /* async */
12121209
)
12131210
if err != nil {
12141211
log.Errorf(ctx, "err: %s", err.Error())

pkg/kv/kvserver/scanner.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ type replicaQueue interface {
3535
// the queue's inclusion criteria and the queue is not already
3636
// too full, etc.
3737
MaybeAddAsync(context.Context, replicaInQueue, hlc.ClockTimestamp)
38+
// AddAsync adds the replica to the queue without checking whether the replica
39+
// meets the queue's inclusion criteria.
40+
AddAsync(context.Context, replicaInQueue, float64)
3841
// MaybeRemove removes the replica from the queue if it is present.
3942
MaybeRemove(roachpb.RangeID)
4043
// Name returns the name of the queue.

pkg/kv/kvserver/scanner_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,17 @@ func (tq *testQueue) MaybeAddAsync(
159159
}
160160
}
161161

162+
// NB: AddAsync on a testQueue is actually synchronous.
163+
func (tq *testQueue) AddAsync(ctx context.Context, replI replicaInQueue, prio float64) {
164+
repl := replI.(*Replica)
165+
166+
tq.Lock()
167+
defer tq.Unlock()
168+
if index := tq.indexOf(repl.RangeID); index == -1 {
169+
tq.ranges = append(tq.ranges, repl)
170+
}
171+
}
172+
162173
func (tq *testQueue) MaybeRemove(rangeID roachpb.RangeID) {
163174
tq.Lock()
164175
defer tq.Unlock()

pkg/kv/kvserver/store.go

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ import (
8484
"github.com/cockroachdb/errors"
8585
"github.com/cockroachdb/logtags"
8686
"github.com/cockroachdb/redact"
87-
raft "go.etcd.io/etcd/raft/v3"
87+
"go.etcd.io/etcd/raft/v3"
8888
"golang.org/x/time/rate"
8989
)
9090

@@ -3398,12 +3398,14 @@ func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracingpb.R
33983398
return collectAndFinish(), nil
33993399
}
34003400

3401-
// ManuallyEnqueue runs the given replica through the requested queue,
3402-
// returning all trace events collected along the way as well as the error
3403-
// message returned from the queue's process method, if any. Intended to help
3404-
// power an admin debug endpoint.
3405-
func (s *Store) ManuallyEnqueue(
3406-
ctx context.Context, queueName string, repl *Replica, skipShouldQueue bool,
3401+
// Enqueue runs the given replica through the requested queue. If `async` is
3402+
// specified, the replica is enqueued into the requested queue for asynchronous
3403+
// processing and this method returns nothing. Otherwise, it returns all trace
3404+
// events collected along the way as well as the error message returned from the
3405+
// queue's process method, if any. Intended to help power the
3406+
// server.decommissionMonitor and an admin debug endpoint.
3407+
func (s *Store) Enqueue(
3408+
ctx context.Context, queueName string, repl *Replica, skipShouldQueue bool, async bool,
34073409
) (recording tracingpb.Recording, processError error, enqueueError error) {
34083410
ctx = repl.AnnotateCtx(ctx)
34093411

@@ -3414,12 +3416,14 @@ func (s *Store) ManuallyEnqueue(
34143416
return nil, nil, errors.Errorf("not enqueueing uninitialized replica %s", repl)
34153417
}
34163418

3417-
var queue queueImpl
3419+
var queue replicaQueue
3420+
var qImpl queueImpl
34183421
var needsLease bool
3419-
for _, replicaQueue := range s.scanner.queues {
3420-
if strings.EqualFold(replicaQueue.Name(), queueName) {
3421-
queue = replicaQueue.(queueImpl)
3422-
needsLease = replicaQueue.NeedsLease()
3422+
for _, q := range s.scanner.queues {
3423+
if strings.EqualFold(q.Name(), queueName) {
3424+
queue = q
3425+
qImpl = q.(queueImpl)
3426+
needsLease = q.NeedsLease()
34233427
}
34243428
}
34253429
if queue == nil {
@@ -3441,21 +3445,32 @@ func (s *Store) ManuallyEnqueue(
34413445
}
34423446
}
34433447

3448+
if async {
3449+
// NB: 1e6 is a placeholder for now. We want to use a high enough priority
3450+
// to ensure that these replicas are priority-ordered first.
3451+
if skipShouldQueue {
3452+
queue.AddAsync(ctx, repl, 1e6 /* prio */)
3453+
} else {
3454+
queue.MaybeAddAsync(ctx, repl, repl.Clock().NowAsClockTimestamp())
3455+
}
3456+
return nil, nil, nil
3457+
}
3458+
34443459
ctx, collectAndFinish := tracing.ContextWithRecordingSpan(
34453460
ctx, s.cfg.AmbientCtx.Tracer, fmt.Sprintf("manual %s queue run", queueName))
34463461
defer collectAndFinish()
34473462

34483463
if !skipShouldQueue {
34493464
log.Eventf(ctx, "running %s.shouldQueue", queueName)
3450-
shouldQueue, priority := queue.shouldQueue(ctx, s.cfg.Clock.NowAsClockTimestamp(), repl, confReader)
3465+
shouldQueue, priority := qImpl.shouldQueue(ctx, s.cfg.Clock.NowAsClockTimestamp(), repl, confReader)
34513466
log.Eventf(ctx, "shouldQueue=%v, priority=%f", shouldQueue, priority)
34523467
if !shouldQueue {
34533468
return collectAndFinish(), nil, nil
34543469
}
34553470
}
34563471

34573472
log.Eventf(ctx, "running %s.process", queueName)
3458-
processed, processErr := queue.process(ctx, repl, confReader)
3473+
processed, processErr := qImpl.process(ctx, repl, confReader)
34593474
log.Eventf(ctx, "processed: %t (err: %v)", processed, processErr)
34603475
return collectAndFinish(), processErr, nil
34613476
}

0 commit comments

Comments
 (0)