Skip to content

Commit 9096ee8

Browse files
committed
kvserver: add TestBaseQueueCallback
This commit adds TestBaseQueueCallbackOnEnqueueResult and TestBaseQueueCallbackOnProcessResult to verify that callbacks are correctly invoked with both enqueue and process results.
1 parent fbaca21 commit 9096ee8

File tree

2 files changed

+268
-0
lines changed

2 files changed

+268
-0
lines changed

pkg/kv/kvserver/queue_helpers_testutil.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ func (bq *baseQueue) testingAdd(
2222
return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority, noopProcessCallback)
2323
}
2424

25+
// testingAddWithCallback is the same as testingAdd, but allows the caller to
26+
// register a process callback that will be invoked when the replica is enqueued
27+
// or processed.
28+
func (bq *baseQueue) testingAddWithCallback(
29+
ctx context.Context, repl replicaInQueue, priority float64, callback processCallback,
30+
) (bool, error) {
31+
return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority, callback)
32+
}
33+
2534
func forceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) error {
2635
// Check that the system config is available. It is needed by many queues. If
2736
// it's not available, some queues silently fail to process any replicas,

pkg/kv/kvserver/queue_test.go

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1287,6 +1287,265 @@ func TestBaseQueueDisable(t *testing.T) {
12871287
}
12881288
}
12891289

1290+
// TestBaseQueueCallbackOnEnqueueResult tests the callback onEnqueueResult for
1291+
// 1. successful case: the replica is successfully enqueued.
1292+
// 2. priority update: updates the priority of the replica and not enqueuing
1293+
// again.
1294+
// 3. disabled: queue is disabled and the replica is not enqueued.
1295+
// 4. stopped: queue is stopped and the replica is not enqueued.
1296+
// 5. already queued: the replica is already in the queue and not enqueued
1297+
// again.
1298+
// 6. purgatory: the replica is in purgatory and not enqueued again.
1299+
// 7. processing: the replica is already being processed and not enqueued again.
1300+
// 8. full queue: the queue is full and the replica is not enqueued again.
1301+
func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
1302+
defer leaktest.AfterTest(t)()
1303+
defer log.Scope(t).Close(t)
1304+
tc := testContext{}
1305+
stopper := stop.NewStopper()
1306+
ctx := context.Background()
1307+
defer stopper.Stop(ctx)
1308+
tc.Start(ctx, t, stopper)
1309+
1310+
t.Run("successfuladd", func(t *testing.T) {
1311+
testQueue := &testQueueImpl{}
1312+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 1})
1313+
r, err := tc.store.GetReplica(1)
1314+
require.NoError(t, err)
1315+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1316+
onEnqueueResult: func(indexOnHeap int, err error) {
1317+
require.Equal(t, 0, indexOnHeap)
1318+
require.NoError(t, err)
1319+
},
1320+
onProcessResult: func(err error) {
1321+
t.Fatal("unexpected call to onProcessResult")
1322+
},
1323+
})
1324+
require.True(t, queued)
1325+
})
1326+
1327+
t.Run("priority", func(t *testing.T) {
1328+
testQueue := &testQueueImpl{}
1329+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 5})
1330+
r, err := tc.store.GetReplica(1)
1331+
require.NoError(t, err)
1332+
priorities := []float64{5.0, 4.0, 8.0, 1.0, 3.0}
1333+
expectedIndices := []int{0, 1, 0, 3, 4}
1334+
// When inserting 5, [5], index 0.
1335+
// When inserting 4, [5, 4], index 1.
1336+
// When inserting 8, [8, 4, 5], index 0.
1337+
// When inserting 1, [8, 4, 5, 1], index 3.
1338+
// When inserting 3, [8, 4, 5, 1, 3], index 4.
1339+
for i, priority := range priorities {
1340+
r.Desc().RangeID = roachpb.RangeID(i + 1)
1341+
queued, _ := bq.testingAddWithCallback(ctx, r, priority, processCallback{
1342+
onEnqueueResult: func(indexOnHeap int, err error) {
1343+
require.Equal(t, expectedIndices[i], indexOnHeap)
1344+
require.NoError(t, err)
1345+
},
1346+
onProcessResult: func(err error) {
1347+
t.Fatal("unexpected call to onProcessResult")
1348+
},
1349+
})
1350+
require.True(t, queued)
1351+
}
1352+
// Set range id back to 1.
1353+
r.Desc().RangeID = 1
1354+
})
1355+
t.Run("disabled", func(t *testing.T) {
1356+
testQueue := &testQueueImpl{}
1357+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
1358+
bq.SetDisabled(true)
1359+
r, err := tc.store.GetReplica(1)
1360+
require.NoError(t, err)
1361+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1362+
onEnqueueResult: func(indexOnHeap int, err error) {
1363+
require.Equal(t, -1, indexOnHeap)
1364+
require.ErrorIs(t, err, errQueueDisabled)
1365+
},
1366+
onProcessResult: func(err error) {
1367+
t.Fatal("unexpected call to onProcessResult")
1368+
},
1369+
})
1370+
require.False(t, queued)
1371+
})
1372+
t.Run("stopped", func(t *testing.T) {
1373+
testQueue := &testQueueImpl{}
1374+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
1375+
bq.mu.stopped = true
1376+
r, err := tc.store.GetReplica(1)
1377+
require.NoError(t, err)
1378+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1379+
onEnqueueResult: func(indexOnHeap int, err error) {
1380+
require.Equal(t, -1, indexOnHeap)
1381+
require.ErrorIs(t, err, errQueueStopped)
1382+
},
1383+
onProcessResult: func(err error) {
1384+
t.Fatal("unexpected call to onProcessResult")
1385+
},
1386+
})
1387+
require.False(t, queued)
1388+
})
1389+
1390+
t.Run("alreadyqueued", func(t *testing.T) {
1391+
testQueue := &testQueueImpl{}
1392+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
1393+
r, err := tc.store.GetReplica(1)
1394+
require.NoError(t, err)
1395+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1396+
onEnqueueResult: func(indexOnHeap int, err error) {
1397+
require.Equal(t, 0, indexOnHeap)
1398+
require.NoError(t, err)
1399+
},
1400+
onProcessResult: func(err error) {
1401+
t.Fatal("unexpected call to onProcessResult")
1402+
},
1403+
})
1404+
require.True(t, queued)
1405+
1406+
// Inserting again on the same range id should fail.
1407+
queued, _ = bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1408+
onEnqueueResult: func(indexOnHeap int, err error) {
1409+
require.Equal(t, -1, indexOnHeap)
1410+
require.ErrorIs(t, err, errReplicaAlreadyInQueue)
1411+
},
1412+
onProcessResult: func(err error) {
1413+
t.Fatal("unexpected call to onProcessResult")
1414+
},
1415+
})
1416+
require.False(t, queued)
1417+
})
1418+
1419+
t.Run("purgatory", func(t *testing.T) {
1420+
testQueue := &testQueueImpl{
1421+
pChan: make(chan time.Time, 1),
1422+
}
1423+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
1424+
r, err := tc.store.GetReplica(1)
1425+
require.NoError(t, err)
1426+
bq.mu.Lock()
1427+
bq.addToPurgatoryLocked(ctx, stopper, r, &testPurgatoryError{}, 1.0, nil)
1428+
bq.mu.Unlock()
1429+
// Inserting a range in purgatory should not enqueue again.
1430+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1431+
onEnqueueResult: func(indexOnHeap int, err error) {
1432+
require.Equal(t, -1, indexOnHeap)
1433+
require.ErrorIs(t, err, errReplicaAlreadyInPurgatory)
1434+
},
1435+
onProcessResult: func(err error) {
1436+
t.Fatal("unexpected call to onProcessResult")
1437+
},
1438+
})
1439+
require.False(t, queued)
1440+
})
1441+
1442+
t.Run("processing", func(t *testing.T) {
1443+
testQueue := &testQueueImpl{}
1444+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
1445+
r, err := tc.store.GetReplica(1)
1446+
require.NoError(t, err)
1447+
item := &replicaItem{rangeID: r.Desc().RangeID, replicaID: r.ReplicaID(), index: -1}
1448+
item.setProcessing()
1449+
bq.addLocked(item)
1450+
// Inserting a range that is already being processed should not enqueue again.
1451+
requeued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1452+
onEnqueueResult: func(indexOnHeap int, err error) {
1453+
require.Equal(t, -1, indexOnHeap)
1454+
require.ErrorIs(t, err, errReplicaAlreadyProcessing)
1455+
},
1456+
onProcessResult: func(err error) {
1457+
t.Fatal("unexpected call to onProcessResult")
1458+
},
1459+
})
1460+
require.True(t, requeued)
1461+
})
1462+
t.Run("fullqueue", func(t *testing.T) {
1463+
testQueue := &testQueueImpl{}
1464+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 0})
1465+
r, err := tc.store.GetReplica(1)
1466+
require.NoError(t, err)
1467+
// Max size is 0, so the replica should not be enqueued.
1468+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1469+
onEnqueueResult: func(indexOnHeap int, err error) {
1470+
// It may be called with err = nil.
1471+
if err != nil {
1472+
require.ErrorIs(t, err, errDroppedDueToFullQueueSize)
1473+
}
1474+
},
1475+
onProcessResult: func(err error) {
1476+
t.Fatal("unexpected call to onProcessResult")
1477+
},
1478+
})
1479+
require.True(t, queued)
1480+
})
1481+
}
1482+
1483+
// TestBaseQueueCallbackOnProcessResult tests that the processCallback is
1484+
// invoked when the replica is processed and will be invoked again if the
1485+
// replica ends up in the purgatory queue and being processed again.
1486+
func TestBaseQueueCallbackOnProcessResult(t *testing.T) {
1487+
defer leaktest.AfterTest(t)()
1488+
defer log.Scope(t).Close(t)
1489+
tc := testContext{}
1490+
stopper := stop.NewStopper()
1491+
ctx := context.Background()
1492+
defer stopper.Stop(ctx)
1493+
tsc := TestStoreConfig(nil)
1494+
tc.StartWithStoreConfig(ctx, t, stopper, tsc)
1495+
1496+
testQueue := &testQueueImpl{
1497+
duration: time.Nanosecond,
1498+
pChan: make(chan time.Time, 1),
1499+
err: &testPurgatoryError{},
1500+
}
1501+
1502+
const replicaCount = 10
1503+
repls := createReplicas(t, &tc, replicaCount)
1504+
1505+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: replicaCount})
1506+
bq.Start(stopper)
1507+
1508+
var totalProcessedCalledWithErr atomic.Int32
1509+
for _, r := range repls {
1510+
queued, _ := bq.testingAddWithCallback(context.Background(), r, 1.0, processCallback{
1511+
onEnqueueResult: func(indexOnHeap int, err error) {
1512+
require.NoError(t, err)
1513+
},
1514+
onProcessResult: func(err error) {
1515+
if err != nil {
1516+
totalProcessedCalledWithErr.Add(1)
1517+
}
1518+
},
1519+
})
1520+
require.True(t, queued)
1521+
}
1522+
1523+
testutils.SucceedsSoon(t, func() error {
1524+
if pc := testQueue.getProcessed(); pc != replicaCount {
1525+
return errors.Errorf("expected %d processed replicas; got %d", replicaCount, pc)
1526+
}
1527+
1528+
if totalProcessedCalledWithErr.Load() != int32(replicaCount) {
1529+
return errors.Errorf("expected %d processed replicas with err; got %d", replicaCount, totalProcessedCalledWithErr.Load())
1530+
}
1531+
return nil
1532+
})
1533+
1534+
// Now, signal that purgatoried replicas should retry.
1535+
testQueue.pChan <- timeutil.Now()
1536+
1537+
testutils.SucceedsSoon(t, func() error {
1538+
if pc := testQueue.getProcessed(); pc != replicaCount*2 {
1539+
return errors.Errorf("expected %d processed replicas; got %d", replicaCount, pc)
1540+
}
1541+
1542+
if totalProcessedCalledWithErr.Load() != int32(replicaCount*2) {
1543+
return errors.Errorf("expected %d processed replicas with err; got %d", replicaCount, totalProcessedCalledWithErr.Load())
1544+
}
1545+
return nil
1546+
})
1547+
}
1548+
12901549
// TestQueueDisable verifies that setting the set of queue.enabled cluster
12911550
// settings actually disables the base queue. This test works alongside
12921551
// TestBaseQueueDisable to verify the entire disable workflow.

0 commit comments

Comments
 (0)