Skip to content
Merged
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ func (mgcq *mvccGCQueue) scanReplicasForHiPriGCHints(
if !isLeaseHolder {
return true
}
added, _ := mgcq.addInternal(ctx, desc, replica.ReplicaID(), deleteRangePriority)
added, _ := mgcq.addInternal(ctx, desc, replica.ReplicaID(), deleteRangePriority, noopProcessCallback)
if added {
mgcq.store.metrics.GCEnqueueHighPriority.Inc(1)
foundReplicas++
Expand Down
190 changes: 156 additions & 34 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,57 @@ type PurgatoryError interface {
PurgatoryErrorMarker() // dummy method for unique interface
}

// processCallback is a hook that is called when a replica finishes processing.
// It is called with the result of the process attempt.
type processCallback func(error)
// noopProcessCallback is a processCallback that does nothing.
var noopProcessCallback = processCallback{
onProcessResult: func(err error) {},
onEnqueueResult: func(indexOnHeap int, err error) {},
}

// processCallback is a hook that is called when a replica is enqueued or
// finishes processing.
//
// NB: None of the fields below can be nil. Use noopProcessCallback if you do
// not need to register any callback.
//
// NB: These callbacks may be called multiple times:
// 1. onEnqueueResult may be called with error = nil first and called again with
// error = errDroppedDueToFullQueueSize when the replicaItem is later dropped
// before processing due to exceeding max queue size.
// 2. onProcessResult may be called with error first and sent to the purgatory
// queue and called again when the puragtory processes the replica.
//
// NB: It is not a strong guarantee that the callback will be executed since
// removeLocked or removeFromReplicaSetLocked may be called without executing
// the callbacks. That happens when the replica is destroyed or recreated with a
// new replica id.
//
// For now, the two use cases (decommissioning nudger and
// maybeBackpressureBatch) are okay with this behaviour. But adding new uses is
// discouraged without cleaning up the contract of processCallback.
// TODO(wenyihu6): consider clean the semantics up after backports
type processCallback struct {
// onProcessResult is called with the result of a process attempt. It is only
// invoked if the base queue gets a chance to process this replica. It may be
// invoked multiple times: first with a processing error and again with
// purgatory processing error.
onProcessResult func(err error)

// onEnqueueResult is called with the result of the enqueue attempt. It is
// invoked when the range is added to the queue and if the range encounters
// any errors before getting a chance to be popped off the queue and getting
// processed.
//
// This may be invoked multiple times: first with error = nil when
// successfully enqueued at the beginning, and again with an error if the
// replica encounters any errors
//
// If error is nil, the index on the priority queue where this item sits is
// also passed in the callback. If error is non-nil, the index passed in the
// callback is -1. Note: indexOnHeap does not represent the item's exact rank
// by priority. It only reflects the item's position in the heap array, which
// gives a rough idea of where it sits in the priority hierarchy.
onEnqueueResult func(indexOnHeap int, err error)
}

// A replicaItem holds a replica and metadata about its queue state and
// processing state.
Expand Down Expand Up @@ -145,8 +193,15 @@ func (i *replicaItem) setProcessing() {
i.processing = true
}

// registerCallback adds a new callback to be executed when the replicaItem
// finishes processing.
// registerCallback adds a new callback to be executed when the replicaItem is
// enqueued or finishes processing. There are two cases where the callback may
// be registered at:
// 1. bq.MaybeAddCallback: register the callback if the replicaItem has been
// added to bq.mu.replicas
// 2. bq.addInternal: register the callback if the replicaItem has not been
// added to bq.mu.replicas yet.
// Note that the contract here is tricky, so adding new uses is discouraged. See
// the comment on processCallback for more details.
func (i *replicaItem) registerCallback(cb processCallback) {
i.callbacks = append(i.callbacks, cb)
}
Expand Down Expand Up @@ -204,8 +259,13 @@ func (pq *priorityQueue) update(item *replicaItem, priority float64) {
}

var (
errQueueDisabled = errors.New("queue disabled")
errQueueStopped = errors.New("queue stopped")
errQueueDisabled = errors.New("queue disabled")
errQueueStopped = errors.New("queue stopped")
errReplicaNotInitialized = errors.New("replica not initialized")
errReplicaAlreadyProcessing = errors.New("replica already processing")
errReplicaAlreadyInPurgatory = errors.New("replica in purgatory")
errReplicaAlreadyInQueue = errors.New("replica already in queue")
errDroppedDueToFullQueueSize = errors.New("queue full")
)

func isExpectedQueueError(err error) bool {
Expand Down Expand Up @@ -571,30 +631,37 @@ func (h baseQueueHelper) MaybeAdd(
h.bq.maybeAdd(ctx, repl, now)
}

func (h baseQueueHelper) Add(ctx context.Context, repl replicaInQueue, prio float64) {
_, err := h.bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), prio)
func (h baseQueueHelper) Add(
ctx context.Context, repl replicaInQueue, prio float64, processCallback processCallback,
) {
_, err := h.bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), prio, processCallback)
if err != nil && log.V(1) {
log.Dev.Infof(ctx, "during Add: %s", err)
}
}

type queueHelper interface {
MaybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp)
Add(ctx context.Context, repl replicaInQueue, prio float64)
Add(ctx context.Context, repl replicaInQueue, prio float64, processCallback processCallback)
}

// baseQueueAsyncRateLimited indicates that the base queue async task was rate
// limited and the task was not executed.
var baseQueueAsyncRateLimited = errors.Newf("async task rate limited")

// Async is a more performant substitute for calling AddAsync or MaybeAddAsync
// when many operations are going to be carried out. It invokes the given helper
// function in a goroutine if semaphore capacity is available. If the semaphore
// is not available, the 'wait' parameter decides whether to wait or to return
// as a noop. Note that if the system is quiescing, fn may never be called in-
// dependent of the value of 'wait'.
// is at capacity, the 'wait' parameter determines whether to block until
// capacity becomes available or return immediately with an error. Note that if
// the system is shutting down, the function may not be executed regardless of
// the 'wait' value.
//
// The caller is responsible for ensuring that opName does not contain PII.
// (Best is to pass a constant string.)
func (bq *baseQueue) Async(
ctx context.Context, opName string, wait bool, fn func(ctx context.Context, h queueHelper),
) {
) error {
if log.V(3) {
log.Dev.InfofDepth(ctx, 2, "%s", redact.Safe(opName))
}
Expand All @@ -609,12 +676,13 @@ func (bq *baseQueue) Async(
if bq.addLogN.ShouldLog() {
log.Dev.Infof(ctx, "rate limited in %s: %s", redact.Safe(opName), err)
}
return
return baseQueueAsyncRateLimited
}
go func(ctx context.Context) {
defer hdl.Activate(ctx).Release(ctx)
fn(ctx, baseQueueHelper{bq})
}(bgCtx)
return nil
}

// MaybeAddAsync offers the replica to the queue. The queue will only process a
Expand All @@ -623,17 +691,30 @@ func (bq *baseQueue) Async(
func (bq *baseQueue) MaybeAddAsync(
ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp,
) {
bq.Async(ctx, "MaybeAdd", false /* wait */, func(ctx context.Context, h queueHelper) {
_ = bq.Async(ctx, "MaybeAdd", false /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
}

// MaybeAddAsyncWithCallback is the same as MaybeAddAsync, but allows the caller
// to register a process callback that will be invoked when the replica is
// enqueued or processed.
func (bq *baseQueue) AddAsyncWithCallback(
ctx context.Context, repl replicaInQueue, prio float64, processCallback processCallback,
) {
if err := bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
h.Add(ctx, repl, prio, processCallback)
}); err != nil {
processCallback.onEnqueueResult(-1 /*indexOnHeap*/, err)
}
}

// AddAsync adds the replica to the queue. Unlike MaybeAddAsync, it will wait
// for other operations to finish instead of turning into a noop (because
// unlikely MaybeAdd, Add is not subject to being called opportunistically).
func (bq *baseQueue) AddAsync(ctx context.Context, repl replicaInQueue, prio float64) {
bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
h.Add(ctx, repl, prio)
_ = bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
h.Add(ctx, repl, prio, noopProcessCallback)
})
}

Expand Down Expand Up @@ -695,7 +776,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
return
}
}
_, err = bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority)
_, err = bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority, noopProcessCallback)
if !isExpectedQueueError(err) {
log.Dev.Errorf(ctx, "unable to add: %+v", err)
}
Expand All @@ -705,20 +786,26 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
// the replica is already queued at a lower priority, updates the existing
// priority. Expects the queue lock to be held by caller.
func (bq *baseQueue) addInternal(
ctx context.Context, desc *roachpb.RangeDescriptor, replicaID roachpb.ReplicaID, priority float64,
ctx context.Context,
desc *roachpb.RangeDescriptor,
replicaID roachpb.ReplicaID,
priority float64,
processCallback processCallback,
) (bool, error) {
// NB: this is intentionally outside of bq.mu to avoid having to consider
// lock ordering constraints.
if !desc.IsInitialized() {
// We checked this above in MaybeAdd(), but we need to check it
// again for Add().
return false, errors.New("replica not initialized")
processCallback.onEnqueueResult(-1 /*indexOnHeap*/, errReplicaNotInitialized)
return false, errReplicaNotInitialized
}

bq.mu.Lock()
defer bq.mu.Unlock()

if bq.mu.stopped {
processCallback.onEnqueueResult(-1 /*indexOnHeap*/, errQueueStopped)
return false, errQueueStopped
}

Expand All @@ -731,12 +818,14 @@ func (bq *baseQueue) addInternal(
if log.V(3) {
log.Dev.Infof(ctx, "queue disabled")
}
processCallback.onEnqueueResult(-1 /*indexOnHeap*/, errQueueDisabled)
return false, errQueueDisabled
}
}

// If the replica is currently in purgatory, don't re-add it.
if _, ok := bq.mu.purgatory[desc.RangeID]; ok {
processCallback.onEnqueueResult(-1 /*indexOnHeap*/, errReplicaAlreadyInPurgatory)
return false, nil
}

Expand All @@ -746,6 +835,7 @@ func (bq *baseQueue) addInternal(
if item.processing {
wasRequeued := item.requeue
item.requeue = true
processCallback.onEnqueueResult(-1 /*indexOnHeap*/, errReplicaAlreadyProcessing)
return !wasRequeued, nil
}

Expand All @@ -756,6 +846,9 @@ func (bq *baseQueue) addInternal(
if log.V(1) {
log.Dev.Infof(ctx, "updating priority: %0.3f -> %0.3f", item.priority, priority)
}
// TODO(wenyihu6): will this introduce a lot of new memory allocation?
processCallback.onEnqueueResult(-1, /*indexOnHeap*/
errors.Wrapf(errReplicaAlreadyInQueue, "priority=%.3f->%.3f", item.priority, priority))
bq.mu.priorityQ.update(item, priority)
}
return false, nil
Expand All @@ -765,6 +858,7 @@ func (bq *baseQueue) addInternal(
log.Dev.Infof(ctx, "adding: priority=%0.3f", priority)
}
item = &replicaItem{rangeID: desc.RangeID, replicaID: replicaID, priority: priority}
item.registerCallback(processCallback)
bq.addLocked(item)

// If adding this replica has pushed the queue past its maximum size, remove
Expand All @@ -776,6 +870,11 @@ func (bq *baseQueue) addInternal(
replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1]
log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v",
priority, replicaItemToDrop.replicaID)
// TODO(wenyihu6): when we introduce base queue max size cluster setting,
// remember to invoke this callback when shrinking the size
for _, cb := range replicaItemToDrop.callbacks {
cb.onEnqueueResult(-1 /*indexOnHeap*/, errDroppedDueToFullQueueSize)
}
bq.removeLocked(replicaItemToDrop)
}
// Signal the processLoop that a replica has been added.
Expand All @@ -784,26 +883,41 @@ func (bq *baseQueue) addInternal(
default:
// No need to signal again.
}
// Note: it may already be dropped or dropped afterwards.
processCallback.onEnqueueResult(item.index /*indexOnHeap*/, nil)
return true, nil
}

// MaybeAddCallback adds a callback to be called when the specified range
// finishes processing if the range is in the queue. If the range is in
// purgatory, the callback is called immediately with the purgatory error. If
// the range is not in the queue (either waiting or processing), the method
// returns false.
// finishes processing. The replica can be in one of several states:
//
// - waiting: not in mu.replicas
// Returns false and no callback is executed.
//
// - queued: in mu.replicas and mu.priorityQ
// Returns true and callback is executed when the replica is processed.
//
// - purgatory: in mu.replicas and mu.purgatory
// Returns true and the callback is called immediately with the purgatory error.
// Note that the callback may be invoked again when the purgatory finishes
// processing the replica.
//
// NB: If the replica this attaches to is dropped from an overfull queue, this
// callback is never called. This is surprising, but the single caller of this
// is okay with these semantics. Adding new uses is discouraged without cleaning
// up the contract of this method, but this code doesn't lend itself readily to
// upholding invariants so there may need to be some cleanup first.
// - processing: only in mu.replicas and currently being processed
// Returns true and callback is executed when processing completes. If the
// replica is currently being processed by the purgatory queue, it will not
// be in bq.mu.purgatory and the callback will only execute when the purgatory
// finishes processing the replica.
//
// NB: Adding new uses is discouraged without cleaning up the contract of
// processCallback. For example, removeFromReplicaSetLocked may be called
// without invoking these callbacks. See replicaItem.registerCallback for more
// details.
func (bq *baseQueue) MaybeAddCallback(rangeID roachpb.RangeID, cb processCallback) bool {
bq.mu.Lock()
defer bq.mu.Unlock()

if purgatoryErr, ok := bq.mu.purgatory[rangeID]; ok {
cb(purgatoryErr)
cb.onProcessResult(purgatoryErr)
return true
}
if item, ok := bq.mu.replicas[rangeID]; ok {
Expand Down Expand Up @@ -1173,7 +1287,7 @@ func (bq *baseQueue) finishProcessingReplica(

// Call any registered callbacks.
for _, cb := range callbacks {
cb(err)
cb.onProcessResult(err)
}

// Handle failures.
Expand All @@ -1192,7 +1306,7 @@ func (bq *baseQueue) finishProcessingReplica(
// purgatory.
if purgErr, ok := IsPurgatoryError(err); ok {
bq.mu.Lock()
bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr, priority /*priorityAtEnqueue*/)
bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr, priority /*priorityAtEnqueue*/, callbacks /*processCallback*/)
bq.mu.Unlock()
return
}
Expand All @@ -1217,6 +1331,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
repl replicaInQueue,
purgErr PurgatoryError,
priorityAtEnqueue float64,
processCallback []processCallback,
) {
bq.mu.AssertHeld()

Expand All @@ -1240,7 +1355,14 @@ func (bq *baseQueue) addToPurgatoryLocked(
return
}

item := &replicaItem{rangeID: repl.GetRangeID(), replicaID: repl.ReplicaID(), index: -1, priority: priorityAtEnqueue}
item := &replicaItem{
rangeID: repl.GetRangeID(),
replicaID: repl.ReplicaID(),
index: -1,
priority: priorityAtEnqueue,
callbacks: processCallback,
}

bq.mu.replicas[repl.GetRangeID()] = item

defer func() {
Expand Down
Loading