Skip to content

Commit 6882d94

Browse files
authored
Merge pull request #82680 from aayushshah15/backport22.1-80993
2 parents 792417a + 66570f2 commit 6882d94

23 files changed

+366
-72
lines changed

pkg/ccl/backupccl/testutils.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
"github.com/cockroachdb/cockroach/pkg/keys"
2929
"github.com/cockroachdb/cockroach/pkg/kv"
3030
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
31-
roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
31+
"github.com/cockroachdb/cockroach/pkg/roachpb"
3232
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
3333
"github.com/cockroachdb/cockroach/pkg/testutils"
3434
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
@@ -567,7 +567,7 @@ WHERE start_pretty LIKE '%s' ORDER BY start_key ASC`, startPretty)).Scan(&startK
567567
lhServer := tc.Server(int(l.Replica.NodeID) - 1)
568568
s, repl := getFirstStoreReplica(t, lhServer, startKey)
569569
testutils.SucceedsSoon(t, func() error {
570-
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue)
570+
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, skipShouldQueue, false /* async */)
571571
require.NoError(t, err)
572572
return checkGCTrace(trace.String())
573573
})
@@ -602,7 +602,7 @@ ORDER BY start_key ASC`, tableName, databaseName).Scan(&startKey)
602602
lhServer := tc.Server(int(l.Replica.NodeID) - 1)
603603
s, repl := getFirstStoreReplica(t, lhServer, startKey)
604604
testutils.SucceedsSoon(t, func() error {
605-
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue)
605+
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, skipShouldQueue, false /* async */)
606606
require.NoError(t, err)
607607
return checkGCTrace(trace.String())
608608
})

pkg/ccl/changefeedccl/helpers_tenant_shim_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ func (t *testServerShim) Decommission(
8989
) error {
9090
panic(unsupportedShimMethod)
9191
}
92+
func (t *testServerShim) DecommissioningNodeMap() map[roachpb.NodeID]interface{} {
93+
panic(unsupportedShimMethod)
94+
}
9295
func (t *testServerShim) SplitRange(
9396
splitKey roachpb.Key,
9497
) (left roachpb.RangeDescriptor, right roachpb.RangeDescriptor, err error) {

pkg/ccl/multiregionccl/datadriven_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,9 @@ func TestMultiRegionDataDriven(t *testing.T) {
311311
return errors.New(`could not find replica`)
312312
}
313313
for _, queueName := range []string{"split", "replicate", "raftsnapshot"} {
314-
_, processErr, err := store.ManuallyEnqueue(ctx, queueName, repl,
315-
true /* skipShouldQueue */)
314+
_, processErr, err := store.Enqueue(
315+
ctx, queueName, repl, true /* skipShouldQueue */, false, /* async */
316+
)
316317
if processErr != nil {
317318
return processErr
318319
}

pkg/kv/kvserver/client_lease_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -839,7 +839,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
839839
return nil
840840
})
841841
_, _, enqueueError := tc.GetFirstStoreFromServer(t, 0).
842-
ManuallyEnqueue(ctx, "replicate", repl, true)
842+
Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */)
843843

844844
require.NoError(t, enqueueError)
845845

@@ -1038,7 +1038,9 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {
10381038
repl := tc.GetFirstStoreFromServer(t, i).LookupReplica(roachpb.RKey(key))
10391039
require.NotNil(t, repl)
10401040
// We don't know who the leaseholder might be, so ignore errors.
1041-
_, _, _ = tc.GetFirstStoreFromServer(t, i).ManuallyEnqueue(ctx, "replicate", repl, true)
1041+
_, _, _ = tc.GetFirstStoreFromServer(t, i).Enqueue(
1042+
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
1043+
)
10421044
}
10431045
}
10441046

pkg/kv/kvserver/client_migration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func TestMigrateWithInflightSnapshot(t *testing.T) {
180180
repl, err := store.GetReplica(desc.RangeID)
181181
require.NoError(t, err)
182182
testutils.SucceedsSoon(t, func() error {
183-
trace, processErr, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
183+
trace, processErr, err := store.Enqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false /* async */)
184184
if err != nil {
185185
return err
186186
}

pkg/kv/kvserver/client_protectedts_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestProtectedTimestamps(t *testing.T) {
156156
testutils.SucceedsSoon(t, func() error {
157157
upsertUntilBackpressure()
158158
s, repl := getStoreAndReplica()
159-
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
159+
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
160160
require.NoError(t, err)
161161
if !processedRegexp.MatchString(trace.String()) {
162162
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
@@ -200,13 +200,13 @@ func TestProtectedTimestamps(t *testing.T) {
200200
s, repl := getStoreAndReplica()
201201
// The protectedts record will prevent us from aging the MVCC garbage bytes
202202
// past the oldest record so shouldQueue should be false. Verify that.
203-
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */)
203+
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
204204
require.NoError(t, err)
205205
require.Regexp(t, "(?s)shouldQueue=false", trace.String())
206206

207207
// If we skipShouldQueue then gc will run but it should only run up to the
208208
// timestamp of our record at the latest.
209-
trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */)
209+
trace, _, err = s.Enqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */, false /* async */)
210210
require.NoError(t, err)
211211
require.Regexp(t, "(?s)done with GC evaluation for 0 keys", trace.String())
212212
thresh := thresholdFromTrace(trace)
@@ -258,7 +258,7 @@ func TestProtectedTimestamps(t *testing.T) {
258258
// happens up to the protected timestamp of the new record.
259259
require.NoError(t, ptsWithDB.Release(ctx, nil, ptsRec.ID.GetUUID()))
260260
testutils.SucceedsSoon(t, func() error {
261-
trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
261+
trace, _, err = s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
262262
require.NoError(t, err)
263263
if !processedRegexp.MatchString(trace.String()) {
264264
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)

pkg/kv/kvserver/liveness/liveness.go

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,12 @@ type NodeLiveness struct {
196196
// heartbeatPaused contains an atomically-swapped number representing a bool
197197
// (1 or 0). heartbeatToken is a channel containing a token which is taken
198198
// when heartbeating or when pausing the heartbeat. Used for testing.
199-
heartbeatPaused uint32
200-
heartbeatToken chan struct{}
201-
metrics Metrics
202-
onNodeDecommissioned func(livenesspb.Liveness) // noop if nil
203-
engineSyncs singleflight.Group
199+
heartbeatPaused uint32
200+
heartbeatToken chan struct{}
201+
metrics Metrics
202+
onNodeDecommissioned func(livenesspb.Liveness) // noop if nil
203+
onNodeDecommissioning OnNodeDecommissionCallback // noop if nil
204+
engineSyncs singleflight.Group
204205

205206
mu struct {
206207
syncutil.RWMutex
@@ -279,24 +280,28 @@ type NodeLivenessOptions struct {
279280
// idempotent as it may be invoked multiple times and defaults to a
280281
// noop.
281282
OnNodeDecommissioned func(livenesspb.Liveness)
283+
// OnNodeDecommissioning is invoked when a node is detected to be
284+
// decommissioning.
285+
OnNodeDecommissioning OnNodeDecommissionCallback
282286
}
283287

284288
// NewNodeLiveness returns a new instance of NodeLiveness configured
285289
// with the specified gossip instance.
286290
func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness {
287291
nl := &NodeLiveness{
288-
ambientCtx: opts.AmbientCtx,
289-
stopper: opts.Stopper,
290-
clock: opts.Clock,
291-
db: opts.DB,
292-
gossip: opts.Gossip,
293-
livenessThreshold: opts.LivenessThreshold,
294-
renewalDuration: opts.RenewalDuration,
295-
selfSem: make(chan struct{}, 1),
296-
st: opts.Settings,
297-
otherSem: make(chan struct{}, 1),
298-
heartbeatToken: make(chan struct{}, 1),
299-
onNodeDecommissioned: opts.OnNodeDecommissioned,
292+
ambientCtx: opts.AmbientCtx,
293+
stopper: opts.Stopper,
294+
clock: opts.Clock,
295+
db: opts.DB,
296+
gossip: opts.Gossip,
297+
livenessThreshold: opts.LivenessThreshold,
298+
renewalDuration: opts.RenewalDuration,
299+
selfSem: make(chan struct{}, 1),
300+
st: opts.Settings,
301+
otherSem: make(chan struct{}, 1),
302+
heartbeatToken: make(chan struct{}, 1),
303+
onNodeDecommissioned: opts.OnNodeDecommissioned,
304+
onNodeDecommissioning: opts.OnNodeDecommissioning,
300305
}
301306
nl.metrics = Metrics{
302307
LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes),
@@ -696,6 +701,10 @@ func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool {
696701
!liveness.Draining
697702
}
698703

704+
// OnNodeDecommissionCallback is a callback that is invoked when a node is
705+
// detected to be decommissioning.
706+
type OnNodeDecommissionCallback func(nodeID roachpb.NodeID)
707+
699708
// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`.
700709
type NodeLivenessStartOptions struct {
701710
Engines []storage.Engine
@@ -1397,6 +1406,10 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record)
13971406

13981407
var shouldReplace bool
13991408
nl.mu.Lock()
1409+
1410+
// NB: shouldReplace will always be true right after a node restarts since the
1411+
// `nodes` map will be empty. This means that the callbacks called below will
1412+
// always be invoked at least once after node restarts.
14001413
oldLivenessRec, ok := nl.getLivenessLocked(newLivenessRec.NodeID)
14011414
if !ok {
14021415
shouldReplace = true
@@ -1424,6 +1437,9 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record)
14241437
if newLivenessRec.Membership.Decommissioned() && nl.onNodeDecommissioned != nil {
14251438
nl.onNodeDecommissioned(newLivenessRec.Liveness)
14261439
}
1440+
if newLivenessRec.Membership.Decommissioning() && nl.onNodeDecommissioning != nil {
1441+
nl.onNodeDecommissioning(newLivenessRec.NodeID)
1442+
}
14271443
}
14281444

14291445
// shouldReplaceLiveness checks to see if the new liveness is in fact newer

pkg/kv/kvserver/replica_learner_test.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -430,8 +430,12 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
430430
// Manually enqueue the leaseholder replica into its store's raft snapshot
431431
// queue. We expect it to pick up on the fact that the non-voter on its range
432432
// needs a snapshot.
433-
recording, pErr, err := leaseholderStore.ManuallyEnqueue(
434-
ctx, "raftsnapshot", leaseholderRepl, false, /* skipShouldQueue */
433+
recording, pErr, err := leaseholderStore.Enqueue(
434+
ctx,
435+
"raftsnapshot",
436+
leaseholderRepl,
437+
false, /* skipShouldQueue */
438+
false, /* async */
435439
)
436440
if pErr != nil {
437441
return pErr
@@ -582,7 +586,9 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
582586
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
583587
{
584588
require.Equal(t, int64(0), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
585-
_, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
589+
_, processErr, err := store.Enqueue(
590+
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
591+
)
586592
require.NoError(t, err)
587593
require.NoError(t, processErr)
588594
require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
@@ -600,7 +606,9 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
600606
ltk.withStopAfterJointConfig(func() {
601607
desc := tc.RemoveVotersOrFatal(t, scratchStartKey, tc.Target(2))
602608
require.True(t, desc.Replicas().InAtomicReplicationChange(), desc)
603-
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
609+
trace, processErr, err := store.Enqueue(
610+
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
611+
)
604612
require.NoError(t, err)
605613
require.NoError(t, processErr)
606614
formattedTrace := trace.String()
@@ -639,7 +647,9 @@ func TestReplicaGCQueueSeesLearnerOrJointConfig(t *testing.T) {
639647
// Run the replicaGC queue.
640648
checkNoGC := func() roachpb.RangeDescriptor {
641649
store, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey)
642-
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicaGC", repl, true /* skipShouldQueue */)
650+
trace, processErr, err := store.Enqueue(
651+
ctx, "replicaGC", repl, true /* skipShouldQueue */, false, /* async */
652+
)
643653
require.NoError(t, err)
644654
require.NoError(t, processErr)
645655
const msg = `not gc'able, replica is still in range descriptor: (n2,s2):`
@@ -699,7 +709,9 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) {
699709
// raft to figure out that the replica needs a snapshot.
700710
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
701711
testutils.SucceedsSoon(t, func() error {
702-
trace, processErr, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
712+
trace, processErr, err := store.Enqueue(
713+
ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false, /* async */
714+
)
703715
if err != nil {
704716
return err
705717
}
@@ -835,7 +847,9 @@ func TestLearnerReplicateQueueRace(t *testing.T) {
835847
queue1ErrCh := make(chan error, 1)
836848
go func() {
837849
queue1ErrCh <- func() error {
838-
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
850+
trace, processErr, err := store.Enqueue(
851+
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
852+
)
839853
if err != nil {
840854
return err
841855
}
@@ -1233,7 +1247,9 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
12331247
// ensure that the merge correctly notices that there is a snapshot in
12341248
// flight and ignores the range.
12351249
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchKey)
1236-
_, processErr, enqueueErr := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
1250+
_, processErr, enqueueErr := store.Enqueue(
1251+
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
1252+
)
12371253
require.NoError(t, enqueueErr)
12381254
require.True(t, kvserver.IsReplicationChangeInProgressError(processErr))
12391255
return nil
@@ -1278,7 +1294,9 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
12781294
})
12791295

12801296
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
1281-
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
1297+
trace, processErr, err := store.Enqueue(
1298+
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
1299+
)
12821300
require.NoError(t, err)
12831301
require.NoError(t, processErr)
12841302
formattedTrace := trace.String()
@@ -1313,7 +1331,9 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
13131331
checkTransitioningOut := func() {
13141332
t.Helper()
13151333
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
1316-
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
1334+
trace, processErr, err := store.Enqueue(
1335+
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
1336+
)
13171337
require.NoError(t, err)
13181338
require.NoError(t, processErr)
13191339
formattedTrace := trace.String()

pkg/kv/kvserver/replicate_queue_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,11 +1204,8 @@ func TestReplicateQueueShouldQueueNonVoter(t *testing.T) {
12041204
// because we know that it is the leaseholder (since it is the only voting
12051205
// replica).
12061206
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
1207-
recording, processErr, err := store.ManuallyEnqueue(
1208-
ctx,
1209-
"replicate",
1210-
repl,
1211-
false, /* skipShouldQueue */
1207+
recording, processErr, err := store.Enqueue(
1208+
ctx, "replicate", repl, false /* skipShouldQueue */, false, /* async */
12121209
)
12131210
if err != nil {
12141211
log.Errorf(ctx, "err: %s", err.Error())

pkg/kv/kvserver/scanner.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ type replicaQueue interface {
3535
// the queue's inclusion criteria and the queue is not already
3636
// too full, etc.
3737
MaybeAddAsync(context.Context, replicaInQueue, hlc.ClockTimestamp)
38+
// AddAsync adds the replica to the queue without checking whether the replica
39+
// meets the queue's inclusion criteria.
40+
AddAsync(context.Context, replicaInQueue, float64)
3841
// MaybeRemove removes the replica from the queue if it is present.
3942
MaybeRemove(roachpb.RangeID)
4043
// Name returns the name of the queue.

0 commit comments

Comments
 (0)